Skip to content

Commit

Permalink
Implement Backblaze B2 as remote provider
Browse files Browse the repository at this point in the history
Signed-off-by: Dennis Urban (github@dennisurban.de)
  • Loading branch information
Dennis Urban committed Aug 5, 2024
1 parent 6759ced commit 3df1858
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 8 deletions.
70 changes: 64 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ type options struct {
WithRolePasswords bool
DumpOnly bool

Upload string // values are none, s3, sftp, gcs
Upload string // values are none, b2, s3, sftp, gcs
UploadPrefix string
Download string // values are none, s3, sftp, gcs
ListRemote string // values are none, s3, sftp, gcs
Download string // values are none, b2, s3, sftp, gcs
ListRemote string // values are none, b2, s3, sftp, gcs
PurgeRemote bool
S3Region string
S3Bucket string
Expand All @@ -96,6 +96,14 @@ type options struct {
S3ForcePath bool
S3DisableTLS bool

B2Region string
B2Bucket string
B2Endpoint string
B2KeyID string
B2AppKey string
B2ForcePath bool
B2ConcurrentUploads int

SFTPHost string
SFTPPort string
SFTPUsername string
Expand Down Expand Up @@ -294,6 +302,14 @@ func parseCli(args []string) (options, []string, error) {
pflag.StringVar(&opts.ListRemote, "list-remote", "none", "list the remote files on s3, gcs, sftp, azure instead of dumping. DBNAMEs become\nglobs to select files")
purgeRemote := pflag.String("purge-remote", "no", "purge the file on remote location after upload, with the same rules\nas the local directory")

pflag.StringVar(&opts.B2Region, "b2-region", "", "B2 region")
pflag.StringVar(&opts.B2Bucket, "b2-bucket", "", "B2 bucket")
pflag.StringVar(&opts.B2Endpoint, "b2-endpoint", "", "B2 endpoint")
pflag.StringVar(&opts.B2KeyID, "b2-key-id", "", "B2 access key ID")
pflag.StringVar(&opts.B2AppKey, "b2-app-key", "", "B2 app key")
B2ForcePath := pflag.String("b2-force-path", "no", "force path style addressing instead of virtual hosted bucket\naddressing")
B2ConcurrentUploads := pflag.Int("b2-concurrent-uploads", 5, "set the amount of concurrent b2 http uploads")

pflag.StringVar(&opts.S3Region, "s3-region", "", "S3 region")
pflag.StringVar(&opts.S3Bucket, "s3-bucket", "", "S3 bucket")
pflag.StringVar(&opts.S3Profile, "s3-profile", "", "AWS client profile name to get credentials")
Expand Down Expand Up @@ -447,7 +463,7 @@ func parseCli(args []string) (options, []string, error) {
}

// Validate upload and download options
stores := []string{"none", "s3", "sftp", "gcs", "azure"}
stores := []string{"none", "b2", "s3", "sftp", "gcs", "azure"}
if err := validateEnum(opts.Upload, stores); err != nil {
return opts, changed, fmt.Errorf("invalid value for --upload: %s", err)
}
Expand All @@ -467,6 +483,18 @@ func parseCli(args []string) (options, []string, error) {

for _, o := range []string{opts.Upload, opts.Download, opts.ListRemote} {
switch o {
case "b2":
opts.B2ForcePath, err = validateYesNoOption(*B2ForcePath)
if err != nil {
return opts, changed, fmt.Errorf("invalid value for --b2-force-path: %s", err)
}

if *B2ConcurrentUploads <= 0 {
return opts, changed, fmt.Errorf("b2 concurrent uploads must be more than 0 (current %d)", *B2ConcurrentUploads)
} else {
opts.B2ConcurrentUploads = *B2ConcurrentUploads
}

case "s3":
// Validate S3 options
opts.S3ForcePath, err = validateYesNoOption(*S3ForcePath)
Expand All @@ -493,13 +521,16 @@ func parseCli(args []string) (options, []string, error) {

func validateConfigurationFile(cfg *ini.File) error {
s, _ := cfg.GetSection(ini.DefaultSection)

known_globals := []string{
"bin_directory", "backup_directory", "timestamp_format", "host", "port", "user",
"dbname", "exclude_dbs", "include_dbs", "with_templates", "format",
"parallel_backup_jobs", "compress_level", "jobs", "pause_timeout",
"purge_older_than", "purge_min_keep", "checksum_algorithm", "pre_backup_hook",
"post_backup_hook", "encrypt", "cipher_pass", "cipher_public_key", "cipher_private_key",
"encrypt_keep_source", "upload", "purge_remote", "s3_region", "s3_bucket", "s3_endpoint",
"encrypt_keep_source", "upload", "purge_remote",
"b2_region", "b2_bucket", "b2_endpoint", "b2_key_id", "b2_app_key", "b2_force_path",
"b2_concurrent_uploads", "s3_region", "s3_bucket", "s3_endpoint",
"s3_profile", "s3_key_id", "s3_secret", "s3_force_path", "s3_tls", "sftp_host",
"sftp_port", "sftp_user", "sftp_password", "sftp_directory", "sftp_identity",
"sftp_ignore_hostkey", "gcs_bucket", "gcs_endpoint", "gcs_keyfile",
Expand Down Expand Up @@ -602,6 +633,14 @@ func loadConfigurationFile(path string) (options, error) {
opts.UploadPrefix = s.Key("upload_prefix").MustString("")
opts.PurgeRemote = s.Key("purge_remote").MustBool(false)

opts.B2Region = s.Key("b2_region").MustString("")
opts.B2Bucket = s.Key("b2_bucket").MustString("")
opts.B2Endpoint = s.Key("b2_endpoint").MustString("")
opts.B2KeyID = s.Key("b2_key_id").MustString("")
opts.B2AppKey = s.Key("b2_app_key").MustString("")
opts.B2ForcePath = s.Key("b2_force_path").MustBool(false)
opts.B2ConcurrentUploads = s.Key("b2_concurrent_uploads").MustInt(5)

opts.S3Region = s.Key("s3_region").MustString("")
opts.S3Bucket = s.Key("s3_bucket").MustString("")
opts.S3EndPoint = s.Key("s3_endpoint").MustString("")
Expand Down Expand Up @@ -660,8 +699,12 @@ func loadConfigurationFile(path string) (options, error) {
}
}

if opts.B2ConcurrentUploads <= 0 {
return opts, fmt.Errorf("b2 concurrent uploads must be more than 0 (current %d)", opts.B2ConcurrentUploads)
}

// Validate upload option
stores := []string{"none", "s3", "sftp", "gcs", "azure"}
stores := []string{"none", "b2", "s3", "sftp", "gcs", "azure"}
if err := validateEnum(opts.Upload, stores); err != nil {
return opts, fmt.Errorf("invalid value for upload: %s", err)
}
Expand Down Expand Up @@ -844,6 +887,21 @@ func mergeCliAndConfigOptions(cliOpts options, configOpts options, onCli []strin
case "purge-remote":
opts.PurgeRemote = cliOpts.PurgeRemote

case "b2-region":
opts.B2Region = cliOpts.B2Region
case "b2-bucket":
opts.B2Bucket = cliOpts.B2Bucket
case "b2-endpoint":
opts.B2Endpoint = cliOpts.B2Endpoint
case "b2-key-id":
opts.B2KeyID = cliOpts.B2KeyID
case "b2-app-key":
opts.B2AppKey = cliOpts.B2AppKey
case "b2-force-path":
opts.B2ForcePath = cliOpts.B2ForcePath
case "b2-concurrent-uploads":
opts.B2ConcurrentUploads = cliOpts.B2ConcurrentUploads

case "s3-region":
opts.S3Region = cliOpts.S3Region
case "s3-bucket":
Expand Down
4 changes: 2 additions & 2 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestParseCli(t *testing.T) {
},
false,
false,
"invalid value for --upload: value not found in [none s3 sftp gcs azure]",
"invalid value for --upload: value not found in [none b2 s3 sftp gcs azure]",
"",
},
{
Expand All @@ -350,7 +350,7 @@ func TestParseCli(t *testing.T) {
},
false,
false,
"invalid value for --download: value not found in [none s3 sftp gcs azure]",
"invalid value for --download: value not found in [none b2 s3 sftp gcs azure]",
"",
},
{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
cloud.google.com/go/iam v1.1.8 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 // indirect
github.com/Backblaze/blazer v0.6.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/Backblaze/blazer v0.6.1 h1:xC9HyC7OcxRzzmtfRiikIEvq4HZYWjU6caFwX2EXw1s=
github.com/Backblaze/blazer v0.6.1/go.mod h1:7/jrGx4O6OKOto6av+hLwelPR8rwZ+PLxQ5ZOiYAjwY=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
Expand Down
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func run() (retVal error) {
return fmt.Errorf("a bucket is mandatory with s3")
}

if (opts.Upload == "b2" || opts.Download == "b2" || opts.ListRemote == "b2") && opts.B2Bucket == "" {
return fmt.Errorf("a bucket is mandatory with Bb2")
}

if (opts.Upload == "gcs" || opts.Download == "gcs" || opts.ListRemote == "gcs") && opts.GCSBucket == "" {
return fmt.Errorf("a bucket is mandatory with gcs")
}
Expand Down Expand Up @@ -513,6 +517,11 @@ func run() (retVal error) {
if err != nil {
return fmt.Errorf("failed to prepare upload to S3: %w", err)
}
case "b2":
repo, err = NewB2Repo(opts)
if err != nil {
return fmt.Errorf("failed to prepare upload to S3: %w", err)
}
case "sftp":
repo, err = NewSFTPRepo(opts)
if err != nil {
Expand Down
131 changes: 131 additions & 0 deletions upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"path/filepath"
"strings"
"time"
"github.com/Backblaze/blazer/b2"
)

// A Repo is a remote service where we can upload files
Expand Down Expand Up @@ -93,6 +94,11 @@ func NewRepo(kind string, opts options) (Repo, error) {
if err != nil {
return nil, fmt.Errorf("failed to prepare S3 repo: %w", err)
}
case "b2":
repo, err = NewB2Repo(opts)
if err != nil {
return nil, fmt.Errorf("failed to prepare B2 repo: %w", err)
}
case "sftp":
repo, err = NewSFTPRepo(opts)
if err != nil {
Expand All @@ -113,6 +119,19 @@ func NewRepo(kind string, opts options) (Repo, error) {
return repo, nil
}

type b2repo struct {
concurrentUploads int
bucket string
region string
endpoint string
keyID string
appKey string
forcePath bool
b2Client *b2.Client
b2Bucket *b2.Bucket
ctx context.Context
}

type s3repo struct {
region string
bucket string
Expand All @@ -125,6 +144,118 @@ type s3repo struct {
session *session.Session
}

func NewB2Repo(opts options) (*b2repo, error) {
r := &b2repo{
region: opts.B2Region,
bucket: opts.B2Bucket,
endpoint: opts.B2Endpoint,
keyID: opts.B2KeyID,
appKey: opts.B2AppKey,
concurrentUploads: opts.B2ConcurrentUploads,
forcePath: opts.B2ForcePath,
ctx: context.Background(),
}
l.Verbosef("starting b2 client with %d connections to %s %s \n", r.concurrentUploads, r.endpoint, r.bucket)
client, err := b2.NewClient(r.ctx, r.keyID, r.appKey)

if err != nil {
return nil, fmt.Errorf("could not create B2 session: %w", err)
}

r.b2Client = client

bucket, err := r.b2Client.Bucket(r.ctx, r.bucket)

if err != nil {
return nil, fmt.Errorf("could not connect to B2 bucket: %w", err)
}

r.b2Bucket = bucket

return r, nil
}

func (r *b2repo) Upload(path string, target string) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()

w := r.b2Bucket.Object(target).NewWriter(r.ctx)
w.ConcurrentUploads = r.concurrentUploads

if _, err := io.Copy(w, f); err != nil {
w.Close()
return err
}

return w.Close()
}

func (r *b2repo) Download(target string, path string) error {

file, err := os.Create(path)
if err != nil {
return fmt.Errorf("download error: %w", err)
}
defer file.Close()

bucket := r.b2Bucket

remoteFile := bucket.Object(path).NewReader(r.ctx)
defer remoteFile.Close()

f, err := os.Create(target)
if err != nil {
return err
}

if _, err := io.Copy(file, remoteFile); err != nil {
f.Close()
return err
}
return f.Close()
}

func (r *b2repo) Close() error {
return nil
}

func (r *b2repo) List(prefix string) ([]Item, error) {

files := make([]Item, 0)

i := r.b2Bucket.List(r.ctx, b2.ListPrefix(prefix))
for i.Next() {
obj := i.Object()

attributes, err := obj.Attrs(r.ctx)

if err != nil {
return nil, err
}

files = append(files, Item{
key: obj.Name(),
modtime: attributes.LastModified,
},
)
}

return files, i.Err()
}

func (r *b2repo) Remove(path string) error {
ctx, cancel := context.WithCancel(r.ctx)

defer cancel()

err := r.b2Bucket.Object(path).Delete(ctx)

return err
}

func NewS3Repo(opts options) (*s3repo, error) {
r := &s3repo{
region: opts.S3Region,
Expand Down

0 comments on commit 3df1858

Please sign in to comment.