Skip to content

Commit

Permalink
pga: make get/list cancellable
Browse files Browse the repository at this point in the history
Control+C will make pga to shutdown gracefully and clean up after itself, leaving no traces of temporary downloads.

Signed-off-by: Santiago M. Mola <santi@mola.io>
  • Loading branch information
smola committed Jun 15, 2018
1 parent 9f0f6b4 commit fb2704b
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 23 deletions.
40 changes: 32 additions & 8 deletions PublicGitArchive/pga/cmd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"compress/gzip"
"context"
"fmt"
"io"
"os/user"
Expand All @@ -17,7 +18,7 @@ const (

// updateCache checks whether a new version of the file in url exists and downloads it
// to dest. It returns an error when it was not possible to update it.
func updateCache(dest, source FileSystem, name string) error {
func updateCache(ctx context.Context, dest, source FileSystem, name string) error {
logrus.Debugf("syncing %s to %s", source.Abs(name), dest.Abs(name))
if upToDate(dest, source, name) {
logrus.Debugf("local copy is up to date")
Expand All @@ -26,9 +27,7 @@ func updateCache(dest, source FileSystem, name string) error {

logrus.Debugf("local copy is outdated or non existent")
tmpName := name + ".tmp"
if err := copy(source, dest, name, tmpName); err != nil {
logrus.Warningf("copy from % to %s failed: %v",
source.Abs(name), dest.Abs(tmpName), err)
if err := copy(ctx, source, dest, name, tmpName); err != nil {
if cerr := dest.Remove(tmpName); cerr != nil {
logrus.Warningf("error removing temporary file %s: %v",
dest.Abs(tmpName), cerr)
Expand All @@ -46,7 +45,9 @@ func updateCache(dest, source FileSystem, name string) error {
return nil
}

func copy(source, dest FileSystem, sourceName, destName string) (err error) {
func copy(ctx context.Context, source, dest FileSystem,
sourceName, destName string) (err error) {

wc, err := dest.Create(destName)
if err != nil {
return fmt.Errorf("could not create %s: %v", dest.Abs(destName), err)
Expand All @@ -59,14 +60,37 @@ func copy(source, dest FileSystem, sourceName, destName string) (err error) {
}
defer checkClose(source.Abs(sourceName), rc, &err)

if _, err = io.Copy(wc, rc); err != nil {
if _, err = cancelableCopy(ctx, wc, rc); err != nil {
return fmt.Errorf("could not copy %s to %s: %v",
source.Abs(sourceName), dest.Abs(destName), err)
}

return err
}

func cancelableCopy(ctx context.Context, dst io.Writer, src io.Reader) (
int64, error) {

var written int64
for {
select {
case <-ctx.Done():
return written, fmt.Errorf("download interrupted")
default:
}

w, err := io.CopyN(dst, src, 512*1024)
written += w
if err == io.EOF {
return written, nil
}

if err != nil {
return written, err
}
}
}

func upToDate(dest, source FileSystem, name string) bool {
ok, err := matchHash(dest, source, name)
if err == nil {
Expand Down Expand Up @@ -101,15 +125,15 @@ func matchHash(dest, source FileSystem, name string) (bool, error) {
return localHash == remoteHash, nil
}

func getIndex() (io.ReadCloser, error) {
func getIndex(ctx context.Context) (io.ReadCloser, error) {
usr, err := user.Current()
if err != nil {
return nil, err
}
dest := localFS(filepath.Join(usr.HomeDir, ".pga"))
source := urlFS(indexURL)

if err := updateCache(dest, source, indexName); err != nil {
if err := updateCache(ctx, dest, source, indexName); err != nil {
return nil, err
}

Expand Down
61 changes: 47 additions & 14 deletions PublicGitArchive/pga/cmd/get.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package cmd

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/spf13/cobra"
"github.com/src-d/datasets/PublicGitArchive/pga/pga"
pb "gopkg.in/cheggaaa/pb.v1"
log "gopkg.in/src-d/go-log.v0"
)

// getCmd represents the get command
Expand All @@ -21,6 +25,8 @@ var getCmd = &cobra.Command{
Alternatively, a list of .siva filenames can be passed through standard input.`,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := setupContext()

source := urlFS("http://pga.sourced.tech/")

dest, err := FileSystemFromFlags(cmd.Flags())
Expand Down Expand Up @@ -54,7 +60,7 @@ Alternatively, a list of .siva filenames can be passed through standard input.`,
filenames = append(filenames, filename)
}
} else {
f, err := getIndex()
f, err := getIndex(ctx)
if err != nil {
return fmt.Errorf("could not open index file: %v", err)
}
Expand Down Expand Up @@ -82,41 +88,68 @@ Alternatively, a list of .siva filenames can be passed through standard input.`,
}
}

return downloadFilenames(dest, source, filenames, maxDownloads)
return downloadFilenames(ctx, dest, source, filenames, maxDownloads)
},
}

func downloadFilenames(dest, source FileSystem, filenames []string,
maxDownloads int) error {
func downloadFilenames(ctx context.Context, dest, source FileSystem,
filenames []string, maxDownloads int) error {

tokens := make(chan bool, maxDownloads)
for i := 0; i < maxDownloads; i++ {
tokens <- true
}

done := make(chan bool)
done := make(chan error)
for _, filename := range filenames {
filename := filepath.Join("siva", "latest", filename[:2], filename)
go func() {
<-tokens
select {
case <-tokens:
case <-ctx.Done():
done <- fmt.Errorf("canceled")
return
}
defer func() { tokens <- true }()

if err := updateCache(dest, source, filename); err != nil {
err := updateCache(ctx, dest, source, filename)
if err != nil {
fmt.Fprintf(os.Stderr, "could not get %s: %v\n", filename, err)
}
done <- true

done <- err
}()
}

bar := pb.StartNew(len(filenames))
for i := 1; ; i++ {
<-done
bar.Set(i)
bar.Update()
if i == len(filenames) {
return nil
var finalErr error
for i := 1; i <= len(filenames); i++ {
if err := <-done; err != nil {
finalErr = fmt.Errorf("there where failed downloads")
}

if finalErr == nil {
bar.Set(i)
bar.Update()
}
}

return finalErr
}

func setupContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
var term = make(chan os.Signal)
go func() {
select {
case <-term:
log.Warningf("signal received, stopping...")
cancel()
}
}()
signal.Notify(term, syscall.SIGTERM, os.Interrupt)

return ctx
}

func init() {
Expand Down
3 changes: 2 additions & 1 deletion PublicGitArchive/pga/cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ var listCmd = &cobra.Command{
Short: "list all the repositories in the index",
Long: `List the repositories in the index, use flags to filter the results.`,
RunE: func(cmd *cobra.Command, args []string) error {
f, err := getIndex()
ctx := setupContext()
f, err := getIndex(ctx)
if err != nil {
return fmt.Errorf("could not open index file: %v", err)
}
Expand Down

0 comments on commit fb2704b

Please sign in to comment.