Skip to content

Commit

Permalink
Add --checksum param to pipe, put, cp, mirror (#5043)
Browse files Browse the repository at this point in the history
Add checksums to pipe, cp, mirror commands.

> Add checksum to uploaded object. Values: MD5, CRC32, CRC32C, SHA1 or SHA256. Requires server trailing headers support like AWS, MinIO and possibly others.

`--md5` is hidden, but still applied.

Uploads to buckets with object locking will still use MD5.

Bonus: `mc stat` shows checksum information.
  • Loading branch information
klauspost authored Sep 24, 2024
1 parent 3dac3d3 commit be87476
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 72 deletions.
27 changes: 21 additions & 6 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/minio/minio-go/v7/pkg/cors"
Expand Down Expand Up @@ -202,6 +203,9 @@ func (n notifyExpiringTLS) RoundTrip(req *http.Request) (res *http.Response, err
return res, err
}

// useTrailingHeaders will enable trailing headers on S3 clients.
var useTrailingHeaders atomic.Bool

// newFactory encloses New function with client cache.
func newFactory() func(config *Config) (Client, *probe.Error) {
clientCache := make(map[uint32]*minio.Client)
Expand Down Expand Up @@ -251,11 +255,12 @@ func newFactory() func(config *Config) (Client, *probe.Error) {
var e error

options := minio.Options{
Creds: credentials.NewChainCredentials(credsChain),
Secure: useTLS,
Region: env.Get("MC_REGION", env.Get("AWS_REGION", "")),
BucketLookup: config.Lookup,
Transport: transport,
Creds: credentials.NewChainCredentials(credsChain),
Secure: useTLS,
Region: env.Get("MC_REGION", env.Get("AWS_REGION", "")),
BucketLookup: config.Lookup,
Transport: transport,
TrailingHeaders: useTrailingHeaders.Load(),
}

api, e = minio.New(hostName, &options)
Expand Down Expand Up @@ -1089,6 +1094,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
StorageClass: strings.ToUpper(putOpts.storageClass),
ServerSideEncryption: putOpts.sse,
SendContentMd5: putOpts.md5,
Checksum: putOpts.checksum,
DisableMultipart: putOpts.disableMultipart,
PartSize: putOpts.multipartSize,
NumThreads: putOpts.multipartThreads,
Expand Down Expand Up @@ -1652,6 +1658,7 @@ func (c *S3Client) Stat(ctx context.Context, opts StatOptions) (*ClientContent,
if opts.isZip {
o.Set("x-minio-extract", "true")
}
o.Set("x-amz-checksum-mode", "ENABLED")
ctnt, err := c.getObjectStat(ctx, bucket, path, o)
if err == nil {
return ctnt, nil
Expand Down Expand Up @@ -2272,7 +2279,15 @@ func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInf
} else {
content.Type = os.FileMode(0o664)
}

setChecksum := func(k, v string) {
if v != "" {
content.Checksum = map[string]string{k: v}
}
}
setChecksum("CRC32", entry.ChecksumCRC32)
setChecksum("CRC32C", entry.ChecksumCRC32C)
setChecksum("SHA1", entry.ChecksumSHA1)
setChecksum("SHA256", entry.ChecksumSHA256)
return content
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type PutOptions struct {
multipartThreads uint
concurrentStream bool
ifNotExists bool
checksum minio.ChecksumType
}

// StatOptions holds options of the HEAD operation
Expand Down Expand Up @@ -219,6 +220,7 @@ type ClientContent struct {
Metadata map[string]string
Tags map[string]string
UserMetadata map[string]string
Checksum map[string]string
ETag string
Expires time.Time

Expand Down
3 changes: 2 additions & 1 deletion cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
}

// Optimize for server side copy if the host is same.
if sourceAlias == targetAlias && !uploadOpts.isZip {
if sourceAlias == targetAlias && !uploadOpts.isZip && !uploadOpts.urls.checksum.IsSet() {
// preserve new metadata and save existing ones.
if uploadOpts.preserve {
currentMetadata, err := getAllMetadata(ctx, sourceAlias, sourceURL.String(), srcSSE, uploadOpts.urls)
Expand Down Expand Up @@ -495,6 +495,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
multipartSize: multipartSize,
multipartThreads: uint(multipartThreads),
ifNotExists: uploadOpts.ifNotExists,
checksum: uploadOpts.urls.checksum,
}

if isReadAt(reader) || length == 0 {
Expand Down
16 changes: 12 additions & 4 deletions cmd/cp-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7"
"github.com/minio/pkg/v3/console"
)

Expand Down Expand Up @@ -72,8 +73,9 @@ var (
Usage: "disable multipart upload feature",
},
cli.BoolFlag{
Name: "md5",
Usage: "force all upload(s) to calculate md5sum checksum",
Name: "md5",
Usage: "force all upload(s) to calculate md5sum checksum",
Hidden: true,
},
cli.StringFlag{
Name: "tags",
Expand All @@ -95,6 +97,7 @@ var (
Name: "zip",
Usage: "Extract from remote zip file (MinIO server source only)",
},
checksumFlag,
}
)

Expand Down Expand Up @@ -315,7 +318,6 @@ func doCopySession(ctx context.Context, cancelCopy context.CancelFunc, cli *cli.
} else {
pg = newAccounter(totalBytes)
}

sourceURLs := cli.Args()[:len(cli.Args())-1]
targetURL := cli.Args()[len(cli.Args())-1] // Last one is target

Expand All @@ -327,6 +329,11 @@ func doCopySession(ctx context.Context, cancelCopy context.CancelFunc, cli *cli.
newerThan := cli.String("newer-than")
rewind := cli.String("rewind")
versionID := cli.String("version-id")
md5, checksum := parseChecksum(cli)
if withLock {
// The Content-MD5 header is required for any request to upload an object with a retention period configured using Amazon S3 Object Lock.
md5, checksum = true, minio.ChecksumNone
}

go func() {
totalBytes := int64(0)
Expand Down Expand Up @@ -420,7 +427,8 @@ func doCopySession(ctx context.Context, cancelCopy context.CancelFunc, cli *cli.
}
}

cpURLs.MD5 = cli.Bool("md5") || withLock
cpURLs.MD5 = md5
cpURLs.checksum = checksum
cpURLs.DisableMultipart = cli.Bool("disable-multipart")

// Verify if previously copied, notify progress bar.
Expand Down
1 change: 1 addition & 0 deletions cmd/cp-url-syntax.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func checkCopySyntax(cliCtx *cli.Context) {
if len(cliCtx.Args()) < 2 {
showCommandHelpAndExit(cliCtx, 1) // last argument is exit code.
}
parseChecksum(cliCtx)

// extract URLs.
URLs := cliCtx.Args()
Expand Down
40 changes: 40 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package cmd

import (
"errors"
"fmt"
"strings"
"time"

"github.com/minio/cli"
"github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7"
)

const envPrefix = "MC_"
Expand Down Expand Up @@ -116,3 +121,38 @@ var encS3Flag = cli.StringSliceFlag{
Usage: "encrypt/decrypt objects using server-side default keys and configurations. (multiple keys can be provided).",
EnvVar: envPrefix + "ENC_S3",
}

var checksumFlag = cli.StringFlag{
Name: "checksum",
Usage: "Add checksum to uploaded object. Values: MD5, CRC32, CRC32C, SHA1 or SHA256. Requires server trailing headers (AWS, MinIO)",
Value: "",
}

func parseChecksum(ctx *cli.Context) (useMD5 bool, ct minio.ChecksumType) {
useMD5 = ctx.Bool("md5")
if cs := ctx.String("checksum"); cs != "" {
switch strings.ToUpper(cs) {
case "CRC32":
ct = minio.ChecksumCRC32
case "CRC32C":
ct = minio.ChecksumCRC32C
case "SHA1":
ct = minio.ChecksumSHA1
case "SHA256":
ct = minio.ChecksumSHA256
case "MD5":
useMD5 = true
default:
err := fmt.Errorf("unknown checksum type: %s. Should be one of MD5, CRC32, CRC32C, SHA1 or SHA256", cs)
fatalIf(probe.NewError(err), "")
}
if ct.IsSet() {
useTrailingHeaders.Store(true)
if useMD5 {
err := errors.New("cannot combine MD5 with checksum")
fatalIf(probe.NewError(err), "")
}
}
}
return
}
14 changes: 11 additions & 3 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ var (
Usage: "preserve file(s)/object(s) attributes and bucket(s) policy/locking configuration(s) on target bucket(s)",
},
cli.BoolFlag{
Name: "md5",
Usage: "force all upload(s) to calculate md5sum checksum",
Name: "md5",
Usage: "force all upload(s) to calculate md5sum checksum",
Hidden: true,
},
cli.BoolFlag{
Name: "multi-master",
Expand Down Expand Up @@ -143,6 +144,7 @@ var (
Name: "skip-errors",
Usage: "skip any errors when mirroring",
},
checksumFlag,
}
)

Expand Down Expand Up @@ -510,6 +512,7 @@ func (mj *mirrorJob) doMirror(ctx context.Context, sURLs URLs, event EventInfo)
})
}
sURLs.MD5 = mj.opts.md5
sURLs.checksum = mj.opts.checksum
sURLs.DisableMultipart = mj.opts.disableMultipart

var ret URLs
Expand Down Expand Up @@ -695,6 +698,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
TargetAlias: targetAlias,
TargetContent: &ClientContent{URL: *targetURL},
MD5: mj.opts.md5,
checksum: mj.opts.checksum,
DisableMultipart: mj.opts.disableMultipart,
encKeyDB: mj.opts.encKeyDB,
}
Expand Down Expand Up @@ -722,6 +726,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
TargetAlias: targetAlias,
TargetContent: &ClientContent{URL: *targetURL},
MD5: mj.opts.md5,
checksum: mj.opts.checksum,
DisableMultipart: mj.opts.disableMultipart,
encKeyDB: mj.opts.encKeyDB,
}
Expand Down Expand Up @@ -977,6 +982,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc

isWatch := cli.Bool("watch") || cli.Bool("multi-master") || cli.Bool("active-active")
isRemove := cli.Bool("remove")
md5, checksum := parseChecksum(cli)

// preserve is also expected to be overwritten if necessary
isMetadata := cli.Bool("a") || isWatch || len(userMetadata) > 0
Expand All @@ -990,7 +996,8 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
isMetadata: isMetadata,
isSummary: cli.Bool("summary"),
isRetriable: cli.Bool("retry"),
md5: cli.Bool("md5"),
md5: md5,
checksum: checksum,
disableMultipart: cli.Bool("disable-multipart"),
skipErrors: cli.Bool("skip-errors"),
excludeOptions: cli.StringSlice("exclude"),
Expand Down Expand Up @@ -1086,6 +1093,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
}
if err == nil {
mj.opts.md5 = true
mj.opts.checksum = minio.ChecksumNone
}
}
errorIf(copyBucketPolicies(ctx, newSrcClt, newDstClt, isOverwrite),
Expand Down
3 changes: 3 additions & 0 deletions cmd/mirror-url.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/minio/cli"
"github.com/minio/minio-go/v7"
"github.com/minio/pkg/v3/wildcard"
)

Expand All @@ -39,6 +40,7 @@ func checkMirrorSyntax(ctx context.Context, cliCtx *cli.Context, encKeyDB map[st
if len(cliCtx.Args()) != 2 {
showCommandHelpAndExit(cliCtx, 1) // last argument is exit code.
}
parseChecksum(cliCtx)

// extract URLs.
URLs := cliCtx.Args()
Expand Down Expand Up @@ -274,6 +276,7 @@ type mirrorOptions struct {
olderThan, newerThan string
storageClass string
userMetadata map[string]string
checksum minio.ChecksumType
}

// Prepares urls that need to be copied or removed based on requested options.
Expand Down
5 changes: 4 additions & 1 deletion cmd/pipe-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var pipeFlags = []cli.Flag{
Usage: "increase the pipe buffer size to a custom value",
Hidden: true,
},
checksumFlag,
}

// Display contents of a file.
Expand Down Expand Up @@ -123,7 +124,7 @@ func pipe(ctx *cli.Context, targetURL string, encKeyDB map[string][]prefixSSEPai
// When no target is specified, pipe cat's stdin to stdout.
return catOut(os.Stdin, -1).Trace()
}

md5, checksum := parseChecksum(ctx)
storageClass := ctx.String("storage-class")
alias, _ := url2Alias(targetURL)
sseKey := getSSE(targetURL, encKeyDB[alias])
Expand Down Expand Up @@ -153,6 +154,8 @@ func pipe(ctx *cli.Context, targetURL string, encKeyDB map[string][]prefixSSEPai
multipartSize: multipartSize,
multipartThreads: uint(multipartThreads),
concurrentStream: ctx.IsSet("concurrent"),
md5: md5,
checksum: checksum,
}

var reader io.Reader
Expand Down
4 changes: 4 additions & 0 deletions cmd/put-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
// put command flags.
var (
putFlags = []cli.Flag{
checksumFlag,
cli.IntFlag{
Name: "parallel, P",
Usage: "upload number of parts in parallel",
Expand Down Expand Up @@ -121,6 +122,7 @@ func mainPut(cliCtx *cli.Context) (e error) {
err.Trace(cliCtx.Args()...)
}
fatalIf(err, "SSE Error")
md5, checksum := parseChecksum(cliCtx)

if len(args) < 2 {
fatalIf(errInvalidArgument().Trace(args...), "Invalid number of arguments.")
Expand Down Expand Up @@ -154,6 +156,8 @@ func mainPut(cliCtx *cli.Context) (e error) {
putURLsCh <- putURLs
break
}
putURLs.checksum = checksum
putURLs.MD5 = md5
totalBytes += putURLs.SourceContent.Size
pg.SetTotal(totalBytes)
totalObjects++
Expand Down
6 changes: 6 additions & 0 deletions cmd/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type statMessage struct {
VersionID string `json:"versionID,omitempty"`
DeleteMarker bool `json:"deleteMarker,omitempty"`
Restore *minio.RestoreInfo `json:"restore,omitempty"`
Checksum map[string]string `json:"checksum,omitempty"`
}

func (stat statMessage) String() (msg string) {
Expand Down Expand Up @@ -86,6 +87,10 @@ func (stat statMessage) String() (msg string) {
msgBuilder.WriteString(fmt.Sprintf("%-10s: %s (lifecycle-rule-id: %s) ", "Expiration",
stat.Expiration.Local().Format(printDate), stat.ExpirationRuleID) + "\n")
}
if len(stat.Checksum) > 0 {
cs := strings.TrimSuffix(strings.TrimPrefix(fmt.Sprintf("%v", stat.Checksum), "map["), "]")
msgBuilder.WriteString(fmt.Sprintf("%-10s: %v", "Checksum", cs) + "\n")
}
if stat.Restore != nil {
msgBuilder.WriteString(fmt.Sprintf("%-10s:", "Restore") + "\n")
if !stat.Restore.ExpiryTime.IsZero() && !stat.Restore.ExpiryTime.Equal(timeSentinel) {
Expand Down Expand Up @@ -192,6 +197,7 @@ func parseStat(c *ClientContent) statMessage {
content.ExpirationRuleID = c.ExpirationRuleID
content.ReplicationStatus = c.ReplicationStatus
content.Restore = c.Restore
content.Checksum = c.Checksum
return content
}

Expand Down
Loading

0 comments on commit be87476

Please sign in to comment.