From 3df18581b9236c4e31e9749d96898cc18b2a6a28 Mon Sep 17 00:00:00 2001 From: Dennis Urban Date: Mon, 5 Aug 2024 07:40:50 +0200 Subject: [PATCH] Implement Backblaze B2 as remote provider Signed-off-by: Dennis Urban (github@dennisurban.de) --- config.go | 70 +++++++++++++++++++++++--- config_test.go | 4 +- go.mod | 1 + go.sum | 2 + main.go | 9 ++++ upload.go | 131 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 209 insertions(+), 8 deletions(-) diff --git a/config.go b/config.go index 09e0d0e..6932d77 100644 --- a/config.go +++ b/config.go @@ -82,10 +82,10 @@ type options struct { WithRolePasswords bool DumpOnly bool - Upload string // values are none, s3, sftp, gcs + Upload string // values are none, b2, s3, sftp, gcs UploadPrefix string - Download string // values are none, s3, sftp, gcs - ListRemote string // values are none, s3, sftp, gcs + Download string // values are none, b2, s3, sftp, gcs + ListRemote string // values are none, b2, s3, sftp, gcs PurgeRemote bool S3Region string S3Bucket string @@ -96,6 +96,14 @@ type options struct { S3ForcePath bool S3DisableTLS bool + B2Region string + B2Bucket string + B2Endpoint string + B2KeyID string + B2AppKey string + B2ForcePath bool + B2ConcurrentUploads int + SFTPHost string SFTPPort string SFTPUsername string @@ -294,6 +302,14 @@ func parseCli(args []string) (options, []string, error) { pflag.StringVar(&opts.ListRemote, "list-remote", "none", "list the remote files on s3, gcs, sftp, azure instead of dumping. DBNAMEs become\nglobs to select files") purgeRemote := pflag.String("purge-remote", "no", "purge the file on remote location after upload, with the same rules\nas the local directory") + pflag.StringVar(&opts.B2Region, "b2-region", "", "B2 region") + pflag.StringVar(&opts.B2Bucket, "b2-bucket", "", "B2 bucket") + pflag.StringVar(&opts.B2Endpoint, "b2-endpoint", "", "B2 endpoint") + pflag.StringVar(&opts.B2KeyID, "b2-key-id", "", "B2 access key ID") + pflag.StringVar(&opts.B2AppKey, "b2-app-key", "", "B2 app key") + B2ForcePath := pflag.String("b2-force-path", "no", "force path style addressing instead of virtual hosted bucket\naddressing") + B2ConcurrentUploads := pflag.Int("b2-concurrent-uploads", 5, "set the amount of concurrent b2 http uploads") + pflag.StringVar(&opts.S3Region, "s3-region", "", "S3 region") pflag.StringVar(&opts.S3Bucket, "s3-bucket", "", "S3 bucket") pflag.StringVar(&opts.S3Profile, "s3-profile", "", "AWS client profile name to get credentials") @@ -447,7 +463,7 @@ func parseCli(args []string) (options, []string, error) { } // Validate upload and download options - stores := []string{"none", "s3", "sftp", "gcs", "azure"} + stores := []string{"none", "b2", "s3", "sftp", "gcs", "azure"} if err := validateEnum(opts.Upload, stores); err != nil { return opts, changed, fmt.Errorf("invalid value for --upload: %s", err) } @@ -467,6 +483,18 @@ func parseCli(args []string) (options, []string, error) { for _, o := range []string{opts.Upload, opts.Download, opts.ListRemote} { switch o { + case "b2": + opts.B2ForcePath, err = validateYesNoOption(*B2ForcePath) + if err != nil { + return opts, changed, fmt.Errorf("invalid value for --b2-force-path: %s", err) + } + + if *B2ConcurrentUploads <= 0 { + return opts, changed, fmt.Errorf("b2 concurrent uploads must be more than 0 (current %d)", *B2ConcurrentUploads) + } else { + opts.B2ConcurrentUploads = *B2ConcurrentUploads + } + case "s3": // Validate S3 options opts.S3ForcePath, err = validateYesNoOption(*S3ForcePath) @@ -493,13 +521,16 @@ func parseCli(args []string) (options, []string, error) { func validateConfigurationFile(cfg *ini.File) error { s, _ := cfg.GetSection(ini.DefaultSection) + known_globals := []string{ "bin_directory", "backup_directory", "timestamp_format", "host", "port", "user", "dbname", "exclude_dbs", "include_dbs", "with_templates", "format", "parallel_backup_jobs", "compress_level", "jobs", "pause_timeout", "purge_older_than", "purge_min_keep", "checksum_algorithm", "pre_backup_hook", "post_backup_hook", "encrypt", "cipher_pass", "cipher_public_key", "cipher_private_key", - "encrypt_keep_source", "upload", "purge_remote", "s3_region", "s3_bucket", "s3_endpoint", + "encrypt_keep_source", "upload", "purge_remote", + "b2_region", "b2_bucket", "b2_endpoint", "b2_key_id", "b2_app_key", "b2_force_path", + "b2_concurrent_uploads", "s3_region", "s3_bucket", "s3_endpoint", "s3_profile", "s3_key_id", "s3_secret", "s3_force_path", "s3_tls", "sftp_host", "sftp_port", "sftp_user", "sftp_password", "sftp_directory", "sftp_identity", "sftp_ignore_hostkey", "gcs_bucket", "gcs_endpoint", "gcs_keyfile", @@ -602,6 +633,14 @@ func loadConfigurationFile(path string) (options, error) { opts.UploadPrefix = s.Key("upload_prefix").MustString("") opts.PurgeRemote = s.Key("purge_remote").MustBool(false) + opts.B2Region = s.Key("b2_region").MustString("") + opts.B2Bucket = s.Key("b2_bucket").MustString("") + opts.B2Endpoint = s.Key("b2_endpoint").MustString("") + opts.B2KeyID = s.Key("b2_key_id").MustString("") + opts.B2AppKey = s.Key("b2_app_key").MustString("") + opts.B2ForcePath = s.Key("b2_force_path").MustBool(false) + opts.B2ConcurrentUploads = s.Key("b2_concurrent_uploads").MustInt(5) + opts.S3Region = s.Key("s3_region").MustString("") opts.S3Bucket = s.Key("s3_bucket").MustString("") opts.S3EndPoint = s.Key("s3_endpoint").MustString("") @@ -660,8 +699,12 @@ func loadConfigurationFile(path string) (options, error) { } } + if opts.B2ConcurrentUploads <= 0 { + return opts, fmt.Errorf("b2 concurrent uploads must be more than 0 (current %d)", opts.B2ConcurrentUploads) + } + // Validate upload option - stores := []string{"none", "s3", "sftp", "gcs", "azure"} + stores := []string{"none", "b2", "s3", "sftp", "gcs", "azure"} if err := validateEnum(opts.Upload, stores); err != nil { return opts, fmt.Errorf("invalid value for upload: %s", err) } @@ -844,6 +887,21 @@ func mergeCliAndConfigOptions(cliOpts options, configOpts options, onCli []strin case "purge-remote": opts.PurgeRemote = cliOpts.PurgeRemote + case "b2-region": + opts.B2Region = cliOpts.B2Region + case "b2-bucket": + opts.B2Bucket = cliOpts.B2Bucket + case "b2-endpoint": + opts.B2Endpoint = cliOpts.B2Endpoint + case "b2-key-id": + opts.B2KeyID = cliOpts.B2KeyID + case "b2-app-key": + opts.B2AppKey = cliOpts.B2AppKey + case "b2-force-path": + opts.B2ForcePath = cliOpts.B2ForcePath + case "b2-concurrent-uploads": + opts.B2ConcurrentUploads = cliOpts.B2ConcurrentUploads + case "s3-region": opts.S3Region = cliOpts.S3Region case "s3-bucket": diff --git a/config_test.go b/config_test.go index f1fd152..f6a9858 100644 --- a/config_test.go +++ b/config_test.go @@ -323,7 +323,7 @@ func TestParseCli(t *testing.T) { }, false, false, - "invalid value for --upload: value not found in [none s3 sftp gcs azure]", + "invalid value for --upload: value not found in [none b2 s3 sftp gcs azure]", "", }, { @@ -350,7 +350,7 @@ func TestParseCli(t *testing.T) { }, false, false, - "invalid value for --download: value not found in [none s3 sftp gcs azure]", + "invalid value for --download: value not found in [none b2 s3 sftp gcs azure]", "", }, { diff --git a/go.mod b/go.mod index 595143d..86d4bfc 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( cloud.google.com/go/iam v1.1.8 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 // indirect + github.com/Backblaze/blazer v0.6.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 2872919..257ad00 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/Backblaze/blazer v0.6.1 h1:xC9HyC7OcxRzzmtfRiikIEvq4HZYWjU6caFwX2EXw1s= +github.com/Backblaze/blazer v0.6.1/go.mod h1:7/jrGx4O6OKOto6av+hLwelPR8rwZ+PLxQ5ZOiYAjwY= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= diff --git a/main.go b/main.go index 0732bec..426e1a4 100644 --- a/main.go +++ b/main.go @@ -177,6 +177,10 @@ func run() (retVal error) { return fmt.Errorf("a bucket is mandatory with s3") } + if (opts.Upload == "b2" || opts.Download == "b2" || opts.ListRemote == "b2") && opts.B2Bucket == "" { + return fmt.Errorf("a bucket is mandatory with Bb2") + } + if (opts.Upload == "gcs" || opts.Download == "gcs" || opts.ListRemote == "gcs") && opts.GCSBucket == "" { return fmt.Errorf("a bucket is mandatory with gcs") } @@ -513,6 +517,11 @@ func run() (retVal error) { if err != nil { return fmt.Errorf("failed to prepare upload to S3: %w", err) } + case "b2": + repo, err = NewB2Repo(opts) + if err != nil { + return fmt.Errorf("failed to prepare upload to S3: %w", err) + } case "sftp": repo, err = NewSFTPRepo(opts) if err != nil { diff --git a/upload.go b/upload.go index 064c081..a5f21df 100644 --- a/upload.go +++ b/upload.go @@ -49,6 +49,7 @@ import ( "path/filepath" "strings" "time" + "github.com/Backblaze/blazer/b2" ) // A Repo is a remote service where we can upload files @@ -93,6 +94,11 @@ func NewRepo(kind string, opts options) (Repo, error) { if err != nil { return nil, fmt.Errorf("failed to prepare S3 repo: %w", err) } + case "b2": + repo, err = NewB2Repo(opts) + if err != nil { + return nil, fmt.Errorf("failed to prepare B2 repo: %w", err) + } case "sftp": repo, err = NewSFTPRepo(opts) if err != nil { @@ -113,6 +119,19 @@ func NewRepo(kind string, opts options) (Repo, error) { return repo, nil } +type b2repo struct { + concurrentUploads int + bucket string + region string + endpoint string + keyID string + appKey string + forcePath bool + b2Client *b2.Client + b2Bucket *b2.Bucket + ctx context.Context +} + type s3repo struct { region string bucket string @@ -125,6 +144,118 @@ type s3repo struct { session *session.Session } +func NewB2Repo(opts options) (*b2repo, error) { + r := &b2repo{ + region: opts.B2Region, + bucket: opts.B2Bucket, + endpoint: opts.B2Endpoint, + keyID: opts.B2KeyID, + appKey: opts.B2AppKey, + concurrentUploads: opts.B2ConcurrentUploads, + forcePath: opts.B2ForcePath, + ctx: context.Background(), + } + l.Verbosef("starting b2 client with %d connections to %s %s \n", r.concurrentUploads, r.endpoint, r.bucket) + client, err := b2.NewClient(r.ctx, r.keyID, r.appKey) + + if err != nil { + return nil, fmt.Errorf("could not create B2 session: %w", err) + } + + r.b2Client = client + + bucket, err := r.b2Client.Bucket(r.ctx, r.bucket) + + if err != nil { + return nil, fmt.Errorf("could not connect to B2 bucket: %w", err) + } + + r.b2Bucket = bucket + + return r, nil +} + +func (r *b2repo) Upload(path string, target string) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + w := r.b2Bucket.Object(target).NewWriter(r.ctx) + w.ConcurrentUploads = r.concurrentUploads + + if _, err := io.Copy(w, f); err != nil { + w.Close() + return err + } + + return w.Close() +} + +func (r *b2repo) Download(target string, path string) error { + + file, err := os.Create(path) + if err != nil { + return fmt.Errorf("download error: %w", err) + } + defer file.Close() + + bucket := r.b2Bucket + + remoteFile := bucket.Object(path).NewReader(r.ctx) + defer remoteFile.Close() + + f, err := os.Create(target) + if err != nil { + return err + } + + if _, err := io.Copy(file, remoteFile); err != nil { + f.Close() + return err + } + return f.Close() +} + +func (r *b2repo) Close() error { + return nil +} + +func (r *b2repo) List(prefix string) ([]Item, error) { + + files := make([]Item, 0) + + i := r.b2Bucket.List(r.ctx, b2.ListPrefix(prefix)) + for i.Next() { + obj := i.Object() + + attributes, err := obj.Attrs(r.ctx) + + if err != nil { + return nil, err + } + + files = append(files, Item{ + key: obj.Name(), + modtime: attributes.LastModified, + }, + ) + } + + return files, i.Err() +} + +func (r *b2repo) Remove(path string) error { + ctx, cancel := context.WithCancel(r.ctx) + + defer cancel() + + err := r.b2Bucket.Object(path).Delete(ctx) + + return err +} + func NewS3Repo(opts options) (*s3repo, error) { r := &s3repo{ region: opts.S3Region,