Skip to content

Commit

Permalink
feat: migrate oplog history from bbolt to sqlite store (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Oct 12, 2024
1 parent 4fa30e3 commit 0806eb9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 26 deletions.
81 changes: 70 additions & 11 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync/atomic"
"syscall"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
"github.com/garethgeorge/backrest/internal/api"
"github.com/garethgeorge/backrest/internal/auth"
Expand All @@ -24,11 +25,11 @@ import (
"github.com/garethgeorge/backrest/internal/metric"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/bboltstore"
"github.com/garethgeorge/backrest/internal/oplog/sqlitestore"
"github.com/garethgeorge/backrest/internal/orchestrator"
"github.com/garethgeorge/backrest/internal/resticinstaller"
"github.com/garethgeorge/backrest/webui"
"github.com/mattn/go-colorable"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -66,21 +67,19 @@ func main() {
var wg sync.WaitGroup

// Create / load the operation log
oplogFile := path.Join(env.DataDir(), "oplog.boltdb")
opstore, err := bboltstore.NewBboltStore(oplogFile)
oplogFile := path.Join(env.DataDir(), "oplog.sqlite")
opstore, err := sqlitestore.NewSqliteStore(oplogFile)
if err != nil {
if !errors.Is(err, bbolt.ErrTimeout) {
zap.S().Fatalf("timeout while waiting to open database, is the database open elsewhere?")
}
zap.S().Warnf("operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile)
zap.S().Fatalf("error creating oplog : %v", err)
zap.S().Fatalf("error creating oplog: %v", err)
}
defer opstore.Close()

oplog, err := oplog.NewOpLog(opstore)
log, err := oplog.NewOpLog(opstore)
if err != nil {
zap.S().Fatalf("error creating oplog: %v", err)
}
migrateBboltOplog(opstore)

// Create rotating log storage
logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs
Expand All @@ -89,7 +88,7 @@ func main() {
}

// Create orchestrator and start task loop.
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, oplog, logStore)
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore)
if err != nil {
zap.S().Fatalf("error creating orchestrator: %v", err)
}
Expand All @@ -104,7 +103,7 @@ func main() {
apiBackrestHandler := api.NewBackrestHandler(
configStore,
orchestrator,
oplog,
log,
logStore,
)

Expand All @@ -116,7 +115,7 @@ func main() {
backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator))
mux.Handle("/", webui.Handler())
mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(oplog)))
mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(log)))
mux.Handle("/metrics", auth.RequireAuthentication(metric.GetRegistry().Handler(), authenticator))

// Serve the HTTP gateway
Expand Down Expand Up @@ -225,3 +224,63 @@ func installLoggers() {
zap.ReplaceGlobals(zap.New(zapcore.NewTee(pretty, ugly)))
zap.S().Infof("writing logs to: %v", logsDir)
}

func migrateBboltOplog(logstore oplog.OpStore) {
oldBboltOplogFile := path.Join(env.DataDir(), "oplog.boltdb")
if _, err := os.Stat(oldBboltOplogFile); err == nil {
zap.S().Warnf("found old bbolt oplog file %q, migrating to sqlite", oldBboltOplogFile)
oldOpstore, err := bboltstore.NewBboltStore(oldBboltOplogFile)
if err != nil {
zap.S().Fatalf("error opening old bbolt oplog: %v", err)
}

oldOplog, err := oplog.NewOpLog(oldOpstore)
if err != nil {
zap.S().Fatalf("error creating old bbolt oplog: %v", err)
}

batch := make([]*v1.Operation, 0, 32)

var errs []error

if err := oldOplog.Query(oplog.Query{}, func(op *v1.Operation) error {
batch = append(batch, op)
if len(batch) == 256 {
if err := logstore.Add(batch...); err != nil {
errs = append(errs, err)
zap.S().Warnf("error migrating %d operations: %v", len(batch), err)
} else {
zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch))
}
batch = batch[:0]
}
return nil
}); err != nil {
zap.S().Warnf("couldn't migrate all operations from the old bbolt oplog, if this recurs delete the file %q and restart", oldBboltOplogFile)
zap.S().Fatalf("error migrating old bbolt oplog: %v", err)
}

if len(batch) > 0 {
if err := logstore.Add(batch...); err != nil {
errs = append(errs, err)
zap.S().Warnf("error migrating %d operations: %v", len(batch), err)
} else {
zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch))
}
zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch))
}

if len(errs) > 0 {
zap.S().Fatalf("encountered %d errors migrating old bbolt oplog, see logs for details. If this probelem recurs delete the file %q and restart", len(errs), oldBboltOplogFile)
}

if err := oldOpstore.Close(); err != nil {
zap.S().Warnf("error closing old bbolt oplog: %v", err)
}
if err := os.Remove(oldBboltOplogFile); err != nil {
zap.S().Warnf("error removing old bbolt oplog: %v", err)
}

zap.S().Info("migrated old bbolt oplog to sqlite")
}
}
32 changes: 17 additions & 15 deletions internal/oplog/sqlitestore/sqlitestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package sqlitestore

import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"strings"
"sync/atomic"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/oplog"
Expand All @@ -26,7 +29,7 @@ var _ oplog.OpStore = (*SqliteStore)(nil)
func NewSqliteStore(db string) (*SqliteStore, error) {
dbpool, err := sqlitex.NewPool(db, sqlitex.PoolOptions{
PoolSize: 16,
Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL | sqlite.OpenSharedCache,
Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL,
})
if err != nil {
return nil, fmt.Errorf("open sqlite pool: %v", err)
Expand Down Expand Up @@ -72,22 +75,18 @@ SELECT 0 WHERE NOT EXISTS (SELECT 1 FROM system_info);
return fmt.Errorf("init sqlite: %v", err)
}

// find the next id value
if err := sqlitex.ExecuteTransient(conn, "SELECT MAX(id) FROM operations", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
m.nextIDVal.Store(stmt.GetInt64("MAX(id)") + 1)
return nil
},
}); err != nil {
return fmt.Errorf("get max ID: %v", err)
}
if m.nextIDVal.Load() == 0 {
m.nextIDVal.Store(1)
}
// rand init value
n, _ := rand.Int(rand.Reader, big.NewInt(1<<20))
m.nextIDVal.Store(n.Int64())

return nil
}

func (o *SqliteStore) nextID(unixTimeMs int64) (int64, error) {
seq := o.nextIDVal.Add(1)
return int64(unixTimeMs<<20) | int64(seq&((1<<20)-1)), nil
}

func (m *SqliteStore) Version() (int64, error) {
conn, err := m.dbpool.Take(context.Background())
if err != nil {
Expand Down Expand Up @@ -262,7 +261,10 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error {

return withSqliteTransaction(conn, func() error {
for _, o := range op {
o.Id = m.nextIDVal.Add(1)
o.Id, err = m.nextID(time.Now().UnixMilli())
if err != nil {
return fmt.Errorf("generate operation id: %v", err)
}
if o.FlowId == 0 {
o.FlowId = o.Id
}
Expand All @@ -281,7 +283,7 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error {
Args: []any{o.Id, o.FlowId, o.InstanceId, o.PlanId, o.RepoId, o.SnapshotId, bytes},
}); err != nil {
if sqlite.ErrCode(err) == sqlite.ResultConstraintUnique {
return fmt.Errorf("operation already exists: %w", oplog.ErrExist)
return fmt.Errorf("operation already exists %v: %w", o.Id, oplog.ErrExist)
}
return fmt.Errorf("add operation: %v", err)
}
Expand Down

0 comments on commit 0806eb9

Please sign in to comment.