Skip to content

Commit

Permalink
Read-only opens
Browse files Browse the repository at this point in the history
This introduces a read-only option on the DB struct. The main effect
is to force all transactions to be read-only. It also skips anything
that might write to disk.

It will put a shared lock on the directory when it's opened
read-only. This will allow other read-only opens but will deny
read-write opens until all readers have closed. If the database is
already open read-write and an attempt is made to open it
again (read-only or read-write), an error occurs.

It also:

 - Fails if the manifest doesn't exist on read-only open
 - Does not attempt to truncate the manifest
 - Skips compactors and memtable
 - All vlogs are opened read-only
  • Loading branch information
Allen Luce committed Mar 7, 2018
1 parent e8ce3e9 commit 273e532
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 50 deletions.
41 changes: 25 additions & 16 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func Open(opt Options) (db *DB, err error) {
return nil, y.Wrapf(err, "Invalid Dir: %q", path)
}
if !dirExists {
if opt.ReadOnly {
return nil, y.Wrapf(err, "Cannot find Dir for read-only open: %q", path)
}
// Try to create the directory
err = os.Mkdir(path, 0700)
if err != nil {
Expand All @@ -188,8 +191,8 @@ func Open(opt Options) (db *DB, err error) {
if err != nil {
return nil, err
}

dirLockGuard, err := acquireDirectoryLock(opt.Dir, lockFile)
var dirLockGuard, valueDirLockGuard *directoryLockGuard
dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
if err != nil {
return nil, err
}
Expand All @@ -198,9 +201,8 @@ func Open(opt Options) (db *DB, err error) {
_ = dirLockGuard.release()
}
}()
var valueDirLockGuard *directoryLockGuard
if absValueDir != absDir {
valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile)
valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
if err != nil {
return nil, err
}
Expand All @@ -217,7 +219,7 @@ func Open(opt Options) (db *DB, err error) {
opt.ValueLogLoadingMode == options.MemoryMap) {
return nil, ErrInvalidLoadingMode
}
manifestFile, manifest, err := openOrCreateManifestFile(opt.Dir)
manifestFile, manifest, err := openOrCreateManifestFile(opt.Dir, opt.ReadOnly)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -256,11 +258,13 @@ func Open(opt Options) (db *DB, err error) {
return nil, err
}

db.closers.compactors = y.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
if !opt.ReadOnly {
db.closers.compactors = y.NewCloser(1)
db.lc.startCompact(db.closers.compactors)

db.closers.memtable = y.NewCloser(1)
go db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
db.closers.memtable = y.NewCloser(1)
go db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}

if err = db.vlog.Open(db, opt); err != nil {
return nil, err
Expand Down Expand Up @@ -363,11 +367,14 @@ func (db *DB) Close() (err error) {
}
db.flushChan <- flushTask{nil, valuePointer{}} // Tell flusher to quit.

db.closers.memtable.Wait()
db.elog.Printf("Memtable flushed")

db.closers.compactors.SignalAndWait()
db.elog.Printf("Compaction finished")
if db.closers.memtable != nil {
db.closers.memtable.Wait()
db.elog.Printf("Memtable flushed")
}
if db.closers.compactors != nil {
db.closers.compactors.SignalAndWait()
db.elog.Printf("Compaction finished")
}

if lcErr := db.lc.close(); err == nil {
err = errors.Wrap(lcErr, "DB.Close")
Expand All @@ -377,8 +384,10 @@ func (db *DB) Close() (err error) {

db.elog.Finish()

if guardErr := db.dirLockGuard.release(); err == nil {
err = errors.Wrap(guardErr, "DB.Close")
if db.dirLockGuard != nil {
if guardErr := db.dirLockGuard.release(); err == nil {
err = errors.Wrap(guardErr, "DB.Close")
}
}
if db.valueDirGuard != nil {
if guardErr := db.valueDirGuard.release(); err == nil {
Expand Down
67 changes: 67 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,73 @@ func TestMergeOperatorGetAfterStop(t *testing.T) {
})
}

func TestReadOnly(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)
opts := getTestOptions(dir)

// Create the DB
db, err := Open(opts)
require.NoError(t, err)
for i := 0; i < 10000; i++ {
txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)), 0x00)
}

// Attempt a read-only open while it's open read-write.
opts.ReadOnly = true
_, err = Open(opts)
require.Error(t, err)
require.Contains(t, err.Error(), "Another process is using this Badger database")

db.Close()

// Open one read-only
opts.ReadOnly = true
kv1, err := Open(opts)
require.NoError(t, err)
defer kv1.Close()

// Open another read-only
kv2, err := Open(opts)
require.NoError(t, err)
defer kv2.Close()

// Attempt a read-write open while it's open for read-only
opts.ReadOnly = false
_, err = Open(opts)
require.Error(t, err)
require.Contains(t, err.Error(), "Another process is using this Badger database")

// Get a thing from the DB
txn1 := kv1.NewTransaction(true)
v1, err := txn1.Get([]byte("key1"))
require.NoError(t, err)
b1, err := v1.Value()
require.NoError(t, err)
require.Equal(t, b1, []byte("value1"))
err = txn1.Commit(nil)
require.NoError(t, err)

// Get a thing from the DB via the other connection
txn2 := kv2.NewTransaction(true)
v2, err := txn2.Get([]byte("key2000"))
require.NoError(t, err)
b2, err := v2.Value()
require.NoError(t, err)
require.Equal(t, b2, []byte("value2000"))
err = txn2.Commit(nil)
require.NoError(t, err)

// Attempt to set a value on a read-only connection
txn := kv1.NewTransaction(true)
err = txn.SetWithMeta([]byte("key"), []byte("value"), 0x00)
require.Error(t, err)
require.Contains(t, err.Error(), "No sets or deletes are allowed in a read-only transaction")
err = txn.Commit(nil)
require.NoError(t, err)
}

func ExampleOpen() {
dir, err := ioutil.TempDir("", "badger")
if err != nil {
Expand Down
43 changes: 28 additions & 15 deletions dir_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ type directoryLockGuard struct {
f *os.File
// The absolute path to our pid file.
path string
// Was this a shared lock for a read-only database?
readOnly bool
}

// acquireDirectoryLock gets an exclusive lock on the directory (using flock). It writes our pid
// to dirPath/pidFileName for convenience.
func acquireDirectoryLock(dirPath string, pidFileName string) (*directoryLockGuard, error) {
// acquireDirectoryLock gets a lock on the directory (using flock). If
// this is not read-only, it will also write our pid to
// dirPath/pidFileName for convenience.
func acquireDirectoryLock(dirPath string, pidFileName string, readOnly bool) (*directoryLockGuard, error) {
// Convert to absolute path so that Release still works even if we do an unbalanced
// chdir in the meantime.
absPidFilePath, err := filepath.Abs(filepath.Join(dirPath, pidFileName))
Expand All @@ -50,30 +53,40 @@ func acquireDirectoryLock(dirPath string, pidFileName string) (*directoryLockGua
if err != nil {
return nil, errors.Wrapf(err, "cannot open directory %q", dirPath)
}
err = unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB)
opts := unix.LOCK_EX | unix.LOCK_NB
if readOnly {
opts = unix.LOCK_SH | unix.LOCK_NB
}

err = unix.Flock(int(f.Fd()), opts)
if err != nil {
f.Close()
return nil, errors.Wrapf(err,
"Cannot acquire directory lock on %q. Another process is using this Badger database.",
dirPath)
}

// Yes, we happily overwrite a pre-existing pid file. We're the only badger process using this
// directory.
err = ioutil.WriteFile(absPidFilePath, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0666)
if err != nil {
f.Close()
return nil, errors.Wrapf(err,
"Cannot write pid file %q", absPidFilePath)
if !readOnly {
// Yes, we happily overwrite a pre-existing pid file. We're the
// only read-write badger process using this directory.
err = ioutil.WriteFile(absPidFilePath, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0666)
if err != nil {
f.Close()
return nil, errors.Wrapf(err,
"Cannot write pid file %q", absPidFilePath)
}
}

return &directoryLockGuard{f, absPidFilePath}, nil
return &directoryLockGuard{f, absPidFilePath, readOnly}, nil
}

// Release deletes the pid file and releases our lock on the directory.
func (guard *directoryLockGuard) release() error {
// It's important that we remove the pid file first.
err := os.Remove(guard.path)
var err error
if !guard.readOnly {
// It's important that we remove the pid file first.
err = os.Remove(guard.path)
}

if closeErr := guard.f.Close(); err == nil {
err = closeErr
}
Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newLevelsController(kv *DB, mf *Manifest) (*levelsController, error) {
var maxFileID uint64
for fileID, tableManifest := range mf.Tables {
fname := table.NewFilename(fileID, kv.opt.Dir)
fd, err := y.OpenExistingSyncedFile(fname, true)
fd, err := y.OpenExistingSyncedFile(fname, true, kv.opt.ReadOnly)
if err != nil {
closeAllTables(tables)
return nil, errors.Wrapf(err, "Opening file: %q", fname)
Expand Down
24 changes: 14 additions & 10 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,20 @@ func (m *Manifest) clone() Manifest {

// openOrCreateManifestFile opens a Badger manifest file if it exists, or creates on if
// one doesn’t.
func openOrCreateManifestFile(dir string) (ret *manifestFile, result Manifest, err error) {
return helpOpenOrCreateManifestFile(dir, manifestDeletionsRewriteThreshold)
func openOrCreateManifestFile(dir string, readOnly bool) (ret *manifestFile, result Manifest, err error) {
return helpOpenOrCreateManifestFile(dir, readOnly, manifestDeletionsRewriteThreshold)
}

func helpOpenOrCreateManifestFile(dir string, deletionsThreshold int) (ret *manifestFile, result Manifest, err error) {
func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (ret *manifestFile, result Manifest, err error) {
path := filepath.Join(dir, ManifestFilename)
fp, err := y.OpenExistingSyncedFile(path, false) // We explicitly sync in addChanges, outside the lock.
fp, err := y.OpenExistingSyncedFile(path, false, readOnly) // We explicitly sync in addChanges, outside the lock.
if err != nil {
if !os.IsNotExist(err) {
return nil, Manifest{}, err
}
if readOnly {
return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
}
m := createManifest()
fp, netCreations, err := helpRewrite(dir, &m)
if err != nil {
Expand All @@ -144,12 +147,13 @@ func helpOpenOrCreateManifestFile(dir string, deletionsThreshold int) (ret *mani
return nil, Manifest{}, err
}

// Truncate file so we don't have a half-written entry at the end.
if err := fp.Truncate(truncOffset); err != nil {
_ = fp.Close()
return nil, Manifest{}, err
if !readOnly {
// Truncate file so we don't have a half-written entry at the end.
if err := fp.Truncate(truncOffset); err != nil {
_ = fp.Close()
return nil, Manifest{}, err
}
}

if _, err = fp.Seek(0, io.SeekEnd); err != nil {
_ = fp.Close()
return nil, Manifest{}, err
Expand Down Expand Up @@ -256,7 +260,7 @@ func helpRewrite(dir string, m *Manifest) (*os.File, int, error) {
if err := os.Rename(rewritePath, manifestPath); err != nil {
return nil, 0, err
}
fp, err = y.OpenExistingSyncedFile(manifestPath, false)
fp, err = y.OpenExistingSyncedFile(manifestPath, false, false)
if err != nil {
return nil, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestManifestRewrite(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)
deletionsThreshold := 10
mf, m, err := helpOpenOrCreateManifestFile(dir, deletionsThreshold)
mf, m, err := helpOpenOrCreateManifestFile(dir, false, deletionsThreshold)
defer func() {
if mf != nil {
mf.close()
Expand All @@ -236,7 +236,7 @@ func TestManifestRewrite(t *testing.T) {
err = mf.close()
require.NoError(t, err)
mf = nil
mf, m, err = helpOpenOrCreateManifestFile(dir, deletionsThreshold)
mf, m, err = helpOpenOrCreateManifestFile(dir, false, deletionsThreshold)
require.NoError(t, err)
require.Equal(t, map[uint64]tableManifest{
uint64(deletionsThreshold * 3): {Level: 0},
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type Options struct {

maxBatchCount int64 // max entries in batch
maxBatchSize int64 // max batch size in bytes

// Whether the DB allows writes. With ReadOnly set, multiple Opens
// can happen.
ReadOnly bool
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down
5 changes: 5 additions & 0 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ func (txn *Txn) Commit(callback func(error)) error {
// defer txn.Discard()
// // Call various APIs.
func (db *DB) NewTransaction(update bool) *Txn {
// Is the DB read-only?
if db.opt.ReadOnly && update { // Force read-only transaction.
update = false
}

txn := &Txn{
update: update,
db: db,
Expand Down
10 changes: 5 additions & 5 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (vlog *valueLog) fpath(fid uint32) string {
return vlogFilePath(vlog.dirPath, fid)
}

func (vlog *valueLog) openOrCreateFiles() error {
func (vlog *valueLog) openOrCreateFiles(readOnly bool) error {
files, err := ioutil.ReadDir(vlog.dirPath)
if err != nil {
return errors.Wrapf(err, "Error while opening value log")
Expand Down Expand Up @@ -551,12 +551,12 @@ func (vlog *valueLog) openOrCreateFiles() error {
vlog.maxFid = uint32(maxFid)

// Open all previous log files as read only. Open the last log file
// as read write.
// as read write (unless the DB is read only).
for fid, lf := range vlog.filesMap {
if fid == maxFid {
if lf.fd, err = y.OpenExistingSyncedFile(vlog.fpath(fid),
vlog.opt.SyncWrites); err != nil {
return errors.Wrapf(err, "Unable to open value log file as RDWR")
vlog.opt.SyncWrites, readOnly); err != nil {
return errors.Wrapf(err, "Unable to open value log file")
}
} else {
if err := lf.openReadOnly(); err != nil {
Expand Down Expand Up @@ -602,7 +602,7 @@ func (vlog *valueLog) Open(kv *DB, opt Options) error {
vlog.opt = opt
vlog.kv = kv
vlog.filesMap = make(map[uint32]*logFile)
if err := vlog.openOrCreateFiles(); err != nil {
if err := vlog.openOrCreateFiles(kv.opt.ReadOnly); err != nil {
return errors.Wrapf(err, "Unable to open value log")
}

Expand Down
6 changes: 5 additions & 1 deletion y/y.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ var (
)

// OpenExistingSyncedFile opens an existing file, errors if it doesn't exist.
func OpenExistingSyncedFile(filename string, sync bool) (*os.File, error) {
func OpenExistingSyncedFile(filename string, sync bool, readonly bool) (*os.File, error) {
flags := os.O_RDWR
if readonly {
flags = os.O_RDONLY
}

if sync {
flags |= datasyncFileFlag
}
Expand Down

0 comments on commit 273e532

Please sign in to comment.