Skip to content

Commit

Permalink
Add incremental context send support
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Jun 22, 2017
1 parent ebe8d28 commit 8278298
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 37 deletions.
51 changes: 46 additions & 5 deletions cli/command/formatter/disk_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ const (
// DiskUsageContext contains disk usage specific information required by the formatter, encapsulate a Context struct.
type DiskUsageContext struct {
Context
Verbose bool
LayersSize int64
Images []*types.ImageSummary
Containers []*types.Container
Volumes []*types.Volume
Verbose bool
LayersSize int64
Images []*types.ImageSummary
Containers []*types.Container
Volumes []*types.Volume
BuilderSize int64
}

func (ctx *DiskUsageContext) startSubsection(format string) (*template.Template, error) {
Expand Down Expand Up @@ -97,6 +98,13 @@ func (ctx *DiskUsageContext) Write() (err error) {
return err
}

err = ctx.contextFormat(tmpl, &diskUsageBuilderContext{
builderSize: ctx.BuilderSize,
})
if err != nil {
return err
}

diskUsageContainersCtx := diskUsageContainersContext{containers: []*types.Container{}}
diskUsageContainersCtx.header = map[string]string{
"Type": typeHeader,
Expand Down Expand Up @@ -179,6 +187,9 @@ func (ctx *DiskUsageContext) verboseWrite() (err error) {
}
}
ctx.postFormat(tmpl, newVolumeContext())

// And build cache
fmt.Fprintf(ctx.Output, "\nBuild cache usage: %s\n\n", units.HumanSize(float64(ctx.BuilderSize)))
return
}

Expand Down Expand Up @@ -357,3 +368,33 @@ func (c *diskUsageVolumesContext) Reclaimable() string {

return fmt.Sprintf("%s", units.HumanSize(float64(reclaimable)))
}

type diskUsageBuilderContext struct {
HeaderContext
verbose bool
builderSize int64
}

func (c *diskUsageBuilderContext) MarshalJSON() ([]byte, error) {
return marshalJSON(c)
}

func (c *diskUsageBuilderContext) Type() string {
return "Build Cache"
}

func (c *diskUsageBuilderContext) TotalCount() string {
return ""
}

func (c *diskUsageBuilderContext) Active() string {
return ""
}

func (c *diskUsageBuilderContext) Size() string {
return units.HumanSize(float64(c.builderSize))
}

func (c *diskUsageBuilderContext) Reclaimable() string {
return c.Size()
}
12 changes: 12 additions & 0 deletions cli/command/formatter/disk_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestDiskUsageContextFormatWrite(t *testing.T) {
Images 0 0 0B 0B
Containers 0 0 0B 0B
Local Volumes 0 0 0B 0B
Build Cache 0B 0B
`,
},
{
Expand All @@ -38,6 +39,9 @@ CONTAINER ID IMAGE COMMAND LOCAL VOLUMES
Local Volumes space usage:
VOLUME NAME LINKS SIZE
Build cache usage: 0B
`,
},
// Errors
Expand Down Expand Up @@ -70,6 +74,7 @@ VOLUME NAME LINKS SIZE
Images 0 0 0B 0B
Containers 0 0 0B 0B
Local Volumes 0 0 0B 0B
Build Cache 0B 0B
`,
},
{
Expand All @@ -82,6 +87,7 @@ Local Volumes 0 0 0B
Images 0
Containers 0
Local Volumes 0
Build Cache
`,
},
// Raw Format
Expand Down Expand Up @@ -109,6 +115,12 @@ active: 0
size: 0B
reclaimable: 0B
type: Build Cache
total:
active:
size: 0B
reclaimable: 0B
`,
},
}
Expand Down
168 changes: 141 additions & 27 deletions cli/command/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"path/filepath"
"regexp"
"runtime"
"sync"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/cli/cli"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/client/session"
"github.com/docker/docker/client/session/filesync"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/progress"
Expand All @@ -37,6 +40,7 @@ import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

type buildOptions struct {
Expand Down Expand Up @@ -69,6 +73,7 @@ type buildOptions struct {
squash bool
target string
imageIDFile string
stream bool
}

// dockerfileFromStdin returns true when the user specified that the Dockerfile
Expand Down Expand Up @@ -141,6 +146,10 @@ func NewBuildCommand(dockerCli *command.DockerCli) *cobra.Command {
flags.SetAnnotation("squash", "experimental", nil)
flags.SetAnnotation("squash", "version", []string{"1.25"})

flags.BoolVar(&options.stream, "stream", false, "Stream attaches to server to negotiate build context")
flags.SetAnnotation("stream", "experimental", nil)
flags.SetAnnotation("stream", "version", []string{"1.31"})

return cmd
}

Expand All @@ -161,7 +170,7 @@ func (out *lastProgressOutput) WriteProgress(prog progress.Progress) error {
}

func isSessionSupported(dockerCli *command.DockerCli) bool {
return dockerCli.ServerInfo().HasExperimental && versions.GreaterThanOrEqualTo(dockerCli.Client().ClientVersion(), "1.30")
return dockerCli.ServerInfo().HasExperimental && versions.GreaterThanOrEqualTo(dockerCli.Client().ClientVersion(), "1.31")
}

// nolint: gocyclo
Expand Down Expand Up @@ -224,7 +233,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
contextDir = tempDir
}

if buildCtx == nil {
if buildCtx == nil && !options.stream {
excludes, err := build.ReadDockerignore(contextDir)
if err != nil {
return err
Expand Down Expand Up @@ -263,6 +272,15 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
}
}

if options.stream && dockerfileCtx == nil {
f, err := os.Open(relDockerfile)
if err != nil {
return errors.Wrapf(err, "failed to open %s", relDockerfile)
}
defer f.Close()
dockerfileCtx = f
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -271,9 +289,17 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
translator := func(ctx context.Context, ref reference.NamedTagged) (reference.Canonical, error) {
return TrustedReference(ctx, dockerCli, ref, nil)
}
// Wrap the tar archive to replace the Dockerfile entry with the rewritten
// Dockerfile which uses trusted pulls.
buildCtx = replaceDockerfileTarWrapper(ctx, buildCtx, relDockerfile, translator, &resolvedTags)
if buildCtx != nil {
// Wrap the tar archive to replace the Dockerfile entry with the rewritten
// Dockerfile which uses trusted pulls.
buildCtx = replaceDockerfileTarWrapper(ctx, buildCtx, relDockerfile, translator, &resolvedTags)
} else if dockerfileCtx != nil {
newDockerfile, _, err := rewriteDockerfileFrom(context.Background(), dockerfileCtx, translator)
if err != nil {
return err
}
dockerfileCtx = ioutil.NopCloser(bytes.NewBuffer(newDockerfile))
}
}

// Setup an upload progress bar
Expand All @@ -297,7 +323,44 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
}
}

var body io.Reader = progress.NewProgressReader(buildCtx, progressOutput, 0, "", "Sending build context to Docker daemon")
var body io.Reader
if buildCtx != nil && !options.stream {
body = progress.NewProgressReader(buildCtx, progressOutput, 0, "", "Sending build context to Docker daemon")
}

if options.stream {
excludes, err := build.ReadDockerignore(contextDir)
if err != nil {
return err
}

p := &sizeProgress{out: progressOutput, action: "Streaming build context to Docker daemon"}

workdirProvider := filesync.NewFSSyncProvider(contextDir, excludes)
s.Allow(workdirProvider)

syncDone := make(chan error)

// this will be replaced on parallel build jobs. keep the current
// progressbar for now
if snpc, ok := workdirProvider.(interface {
SetNextProgressCallback(func(int, bool), chan error)
}); ok {
snpc.SetNextProgressCallback(p.update, syncDone)
}

buf := newBufferedWriter(syncDone, buildBuff)
defer func() {
select {
case <-buf.flushed:
case <-ctx.Done():
}
}()
buildBuff = buf

remote = "client-session"
body = buildCtx
}

authConfigs, _ := dockerCli.GetAllCredentials()
buildOptions := types.ImageBuildOptions{
Expand Down Expand Up @@ -347,6 +410,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
if options.quiet {
fmt.Fprintf(dockerCli.Err(), "%s", progBuff)
}
cancel()
return err
}
defer response.Body.Close()
Expand Down Expand Up @@ -414,34 +478,84 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
return nil
}

type sizeProgress struct {
out progress.Output
action string
limiter *rate.Limiter
}

func (sp *sizeProgress) update(size int, last bool) {
if sp.limiter == nil {
sp.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
}
if last || sp.limiter.Allow() {
sp.out.WriteProgress(progress.Progress{Action: sp.action, Current: int64(size), LastUpdate: last})
}
}

type bufferedWriter struct {
done chan error
io.Writer
buf *bytes.Buffer
flushed chan struct{}
mu sync.Mutex
}

func newBufferedWriter(done chan error, w io.Writer) *bufferedWriter {
bw := &bufferedWriter{done: done, Writer: w, buf: new(bytes.Buffer), flushed: make(chan struct{})}
go func() {
<-done
bw.flushBuffer()
}()
return bw
}

func (bw *bufferedWriter) Write(dt []byte) (int, error) {
select {
case <-bw.done:
bw.flushBuffer()
return bw.Writer.Write(dt)
default:
return bw.buf.Write(dt)
}
}

func (bw *bufferedWriter) flushBuffer() {
bw.mu.Lock()
select {
case <-bw.flushed:
default:
bw.Writer.Write(bw.buf.Bytes())
close(bw.flushed)
}
bw.mu.Unlock()
}

func getBuildSharedKey(dir string) (string, error) {
// build session is hash of build dir with node based randomness
sessionFile := filepath.Join(cliconfig.Dir(), ".buildsharedkey")
if _, err := os.Lstat(sessionFile); err != nil {
if os.IsNotExist(err) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", err
}
if err := os.MkdirAll(cliconfig.Dir(), 0600); err != nil {
return "", err
}
if err := ioutil.WriteFile(sessionFile, []byte(hex.EncodeToString(b)), 0600); err != nil {
return "", err
dt := []byte(cliconfig.Dir()) // if no access to config dir then only use path
if err := os.MkdirAll(cliconfig.Dir(), 0700); err == nil {
// build session is hash of build dir with node based randomness
sessionFile := filepath.Join(cliconfig.Dir(), ".buildsharedkey")
if _, err := os.Lstat(sessionFile); err != nil {
if os.IsNotExist(err) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", err
}
if err := ioutil.WriteFile(sessionFile, []byte(hex.EncodeToString(b)), 0600); err != nil {
return "", err
}
}
} else {
return "", err
}
}

dt, err := ioutil.ReadFile(sessionFile)
if err != nil {
return "", errors.Wrapf(err, "failed to read %s", sessionFile)
dt, err = ioutil.ReadFile(sessionFile)
if err != nil {
return "", errors.Wrapf(err, "failed to read %s", sessionFile)

}
}

s := sha256.Sum256([]byte(fmt.Sprintf("%s:%s", dt, dir)))
return hex.EncodeToString(s[:]), nil // add randomness to force recheck
return hex.EncodeToString(s[:]), nil
}

func isLocalDir(c string) bool {
Expand Down
11 changes: 6 additions & 5 deletions cli/command/system/df.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func runDiskUsage(dockerCli *command.DockerCli, opts diskUsageOptions) error {
Output: dockerCli.Out(),
Format: formatter.NewDiskUsageFormat(format),
},
LayersSize: du.LayersSize,
Images: du.Images,
Containers: du.Containers,
Volumes: du.Volumes,
Verbose: opts.verbose,
LayersSize: du.LayersSize,
BuilderSize: du.BuilderSize,
Images: du.Images,
Containers: du.Containers,
Volumes: du.Volumes,
Verbose: opts.verbose,
}

return duCtx.Write()
Expand Down
Loading

0 comments on commit 8278298

Please sign in to comment.