diff --git a/commands/agent.go b/commands/agent.go new file mode 100644 index 0000000..a3ac786 --- /dev/null +++ b/commands/agent.go @@ -0,0 +1,64 @@ +package commands + +import ( + "io/ioutil" + "log" + "os" + "os/signal" + "path/filepath" + + "github.com/ChrisMcKenzie/dropship/service" + "github.com/hashicorp/hcl" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +const maxGoRoutines = 10 + +var agentCmd = &cobra.Command{ + Use: "agent", + Short: "starts automatic checks and update", + Run: agent, +} + +func agent(c *cobra.Command, args []string) { + InitializeConfig() + + root := viper.GetString("servicePath") + services, err := loadServices(root) + if err != nil { + log.Fatalln(err) + } + + t := service.NewRunner(len(services)) + shutdownCh := make(chan struct{}) + + for _, s := range services { + service.NewDispatcher(s, t, shutdownCh) + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt) + <-sigs + close(shutdownCh) + + t.Shutdown() +} + +func loadServices(root string) (d []service.Config, err error) { + files, _ := filepath.Glob(root + "/*.hcl") + for _, file := range files { + var data []byte + data, err = ioutil.ReadFile(file) + if err != nil { + return + } + + var deploy struct { + Services []service.Config `hcl:"service,expand"` + } + hcl.Decode(&deploy, string(data)) + d = append(d, deploy.Services...) + } + return +} diff --git a/commands/agent/action.go b/commands/agent/action.go deleted file mode 100644 index 466e1f4..0000000 --- a/commands/agent/action.go +++ /dev/null @@ -1,70 +0,0 @@ -package agent - -import ( - "log" - "os" - "os/signal" - "sync" - "syscall" - - "github.com/ChrisMcKenzie/dropship/repo/rackspace" - "github.com/spf13/cobra" -) - -var Command = &cobra.Command{ - Use: "agent", - Short: "Start updater service", - Long: `Starts an agent that will check and update all defined services -on a given interval - `, - Run: Action, -} - -var configPath string - -func init() { - Command.Flags().StringVar(&configPath, "config", "", "path to config file") -} - -func Action(cmd *cobra.Command, args []string) { - if configPath == "" { - log.Fatal("config must be supplied") - } - - config, err := loadConfig(configPath) - if err != nil { - log.Fatal(err) - } - - services := loadServices(config.ConfigDir) - - rackspace.Setup( - config.Rackspace.User, - config.Rackspace.Key, - config.Rackspace.Region, - ) - - ch := make(chan struct{}) - var wg *sync.WaitGroup = &sync.WaitGroup{} - wg.Add(1) - - for _, service := range services { - log.Println("setting up service", service.Id) - updater, err := setup(service, ch) - if err != nil { - panic(err) - } - go updater.Start(wg) - } - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - go func(ch chan struct{}, sigs chan os.Signal) { - <-sigs - close(ch) - wg.Done() - }(ch, sigs) - - wg.Wait() -} diff --git a/commands/agent/config.go b/commands/agent/config.go deleted file mode 100644 index 080fbfb..0000000 --- a/commands/agent/config.go +++ /dev/null @@ -1,29 +0,0 @@ -package agent - -import ( - "io/ioutil" - - "github.com/hashicorp/hcl" -) - -type Config struct { - ConfigDir string `hcl:"config"` - LockHost string `hcl:"lockhost"` - Rackspace struct { - User string `hcl:"user"` - Key string `hcl:"key"` - Region string `hcl:"region"` - } `hcl:"rackspace"` -} - -func loadConfig(path string) (*Config, error) { - data, err := ioutil.ReadFile(path) - if err != nil { - return nil, err - } - - config := &Config{} - err = hcl.Decode(config, string(data)) - - return config, err -} diff --git a/commands/agent/semaphore.go b/commands/agent/semaphore.go deleted file mode 100644 index a6cdead..0000000 --- a/commands/agent/semaphore.go +++ /dev/null @@ -1,19 +0,0 @@ -package agent - -import ( - "path/filepath" - - "github.com/ChrisMcKenzie/dropship/structs" - "github.com/hashicorp/consul/api" -) - -const BasePrefix = "dropship/locks" - -func AcquireLock(s structs.Service) (*api.Lock, error) { - client, err := api.NewClient(api.DefaultConfig()) - if err != nil { - return nil, err - } - - return client.LockKey(filepath.Join(BasePrefix, s.Name)) -} diff --git a/commands/agent/services.go b/commands/agent/services.go deleted file mode 100644 index b48d2af..0000000 --- a/commands/agent/services.go +++ /dev/null @@ -1,24 +0,0 @@ -package agent - -import ( - "io/ioutil" - "path/filepath" - - "github.com/ChrisMcKenzie/dropship/structs" - "github.com/hashicorp/hcl" -) - -func loadServices(root string) (d []structs.Service) { - files, _ := filepath.Glob(root + "/*.hcl") - for _, file := range files { - data, err := ioutil.ReadFile(file) - if err != nil { - panic(err) - } - - var deploy structs.Deployment - hcl.Decode(&deploy, string(data)) - d = append(d, deploy.Services...) - } - return -} diff --git a/commands/agent/services_test.go b/commands/agent/services_test.go deleted file mode 100644 index c0b7b94..0000000 --- a/commands/agent/services_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package agent - -import "testing" - -func TestServices(t *testing.T) { - services := loadServices("../..") - if len(services) == 0 { - t.Error("expected services to be loaded") - t.Fail() - } -} diff --git a/commands/agent/setup.go b/commands/agent/setup.go deleted file mode 100644 index 412cddd..0000000 --- a/commands/agent/setup.go +++ /dev/null @@ -1,34 +0,0 @@ -package agent - -import ( - "errors" - "time" - - "github.com/ChrisMcKenzie/dropship/repo" - "github.com/ChrisMcKenzie/dropship/structs" -) - -func setup(service structs.Service, shutdownCh <-chan struct{}) (*updater, error) { - if service.CheckInterval == "" { - service.CheckInterval = "10s" - } - - tickerDur, err := time.ParseDuration(service.CheckInterval) - if err != nil { - return nil, err - } - - repo := repo.GetRepo(service.Artifact.Repo) - if repo == nil { - return nil, errors.New("Unable to find repo " + service.Artifact.Repo) - } - - updater := &updater{ - time.NewTicker(tickerDur), - shutdownCh, - service, - repo, - } - - return updater, nil -} diff --git a/commands/agent/setup_test.go b/commands/agent/setup_test.go deleted file mode 100644 index b062ead..0000000 --- a/commands/agent/setup_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package agent - -import ( - "bytes" - "io" - "testing" - - "github.com/ChrisMcKenzie/dropship/repo" - "github.com/ChrisMcKenzie/dropship/structs" -) - -var ( - mockCh = make(chan struct{}) - mockService = structs.Service{ - CheckInterval: "", - Artifact: structs.ArtifactConfig{ - Repo: "mock", - }, - } -) - -type MockRepo struct{} - -func (r *MockRepo) GetName() string { - return "mock" -} - -func (r *MockRepo) IsUpdated(s structs.Service) (bool, error) { - return true, nil -} - -func (r *MockRepo) Download(s structs.Service) (io.Reader, repo.MetaData, error) { - return new(bytes.Buffer), repo.MetaData{}, nil -} - -func TestSetup(t *testing.T) { - repo.Register(&MockRepo{}) - - updater, err := setup(mockService, mockCh) - if err != nil { - t.Error(err) - t.Fail() - } - - if updater.service.CheckInterval != "10s" { - t.Error("CheckInterval was not properly defaulted.") - t.Fail() - } - - mockService.CheckInterval = "invalid time" - _, intErr := setup(mockService, mockCh) - if intErr == nil { - t.Error("expected setup to fail due to bad interval") - t.Fail() - } - - mockService.CheckInterval = "1s" - mockService.Artifact.Repo = "Unknown Repo" - _, repoErr := setup(mockService, mockCh) - if repoErr == nil { - t.Error("expected setup to fail due to bad repo") - t.Fail() - } -} diff --git a/commands/agent/updater.go b/commands/agent/updater.go deleted file mode 100644 index 143e3fa..0000000 --- a/commands/agent/updater.go +++ /dev/null @@ -1,96 +0,0 @@ -package agent - -import ( - "log" - "os/exec" - "strings" - "sync" - "time" - - "github.com/ChrisMcKenzie/dropship/repo" - "github.com/ChrisMcKenzie/dropship/structs" -) - -type updater struct { - ticker *time.Ticker - shutdownCh <-chan struct{} - service structs.Service - repo repo.Repo -} - -func (u *updater) Start(wg *sync.WaitGroup) { - wg.Add(1) - defer wg.Done() - - log.Println("Starting", u.service.Id, "updater") - for { - select { - case <-u.ticker.C: - log.Println("Performing", u.service.Id, "update check") - u.check() - case _, ok := <-u.shutdownCh: - if !ok { - log.Println("Stopping", u.service.Id, "update check") - u.ticker.Stop() - return - } - } - } -} - -func (u *updater) check() { - isUpToDate, err := u.repo.IsUpdated(u.service) - - if err != nil { - log.Println(err) - } - - // check the md5sums - if !isUpToDate { - u.update() - } - - return -} - -func (u *updater) update() { - log.Println("Starting update") - if u.service.SequentialUpdate { - log.Println("Acquiring lock for", u.service.Name) - lock, err := AcquireLock(u.service) - _, err = lock.Lock(nil) - if err != nil { - panic(err) - } - defer func() { - log.Println("Releasing lock for", u.service.Name) - lock.Unlock() - }() - } - - file, meta, err := u.repo.Download(u.service) - - if err != nil { - log.Println(err) - } - - log.Println("Finished Downloading") - if meta.ContentType == "application/x-gzip" { - err := untar(file, u.service.Artifact.Dest) - if err != nil { - log.Println(err) - } - } - - u.service.Hash = meta.Hash - log.Println("Setting current version to", u.service.Hash) - - if u.service.Command != "" { - cmd := strings.Fields(u.service.Command) - out, err := exec.Command(cmd[0], cmd[1:]...).Output() - if err != nil { - log.Println(err) - } - log.Println(string(out)) - } -} diff --git a/commands/agent/updater_test.go b/commands/agent/updater_test.go deleted file mode 100644 index 82f23a4..0000000 --- a/commands/agent/updater_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package agent - -import "testing" - -func TestUpdater(t *testing.T) { - -} diff --git a/commands/dropship.go b/commands/dropship.go index a33be51..402e2ff 100644 --- a/commands/dropship.go +++ b/commands/dropship.go @@ -1,21 +1,65 @@ package commands import ( - "github.com/ChrisMcKenzie/dropship/commands/agent" + "io/ioutil" + "log" + "os" + + "github.com/hashicorp/hcl" "github.com/spf13/cobra" + "github.com/spf13/viper" ) +type Config struct { + ServicePath string `hcl:"service_path"` + Rackspace struct { + User string `hcl:"user"` + Key string `hcl:"key"` + Region string `hcl:"region"` + } `hcl:"rackspace"` +} + var DropshipCmd = &cobra.Command{ Use: "dropship", - Short: "dropship deploys your code", - Long: "dropship monitors and automatically updates your code", + Short: "A tool for automated and distributed artifact deployment", + Long: `Dropship allows servers to automatically check, download, and install +artifacts from a file repository in a distributed fashion. + `, +} + +var CfgFile string + +func init() { + DropshipCmd.PersistentFlags().StringVar(&CfgFile, "config", "/etc/dropship.d/dropship.hcl", "config file (default is path/config.yaml|json|toml)") } func Execute() { AddCommands() - DropshipCmd.Execute() + if err := DropshipCmd.Execute(); err != nil { + os.Exit(-1) + } } func AddCommands() { - DropshipCmd.AddCommand(agent.Command) + DropshipCmd.AddCommand(agentCmd) +} + +func InitializeConfig() { + var cfg Config + cfgData, err := ioutil.ReadFile(CfgFile) + if err != nil { + log.Fatalln("Unable to locate Config File. make sure you specify it using the --config flag") + return + } + err = hcl.Decode(&cfg, string(cfgData)) + + if err != nil { + log.Fatalln("Unable to parse Config File.") + return + } + + viper.Set("servicePath", cfg.ServicePath) + viper.Set("rackspaceUser", cfg.Rackspace.User) + viper.Set("rackspaceKey", cfg.Rackspace.Key) + viper.Set("rackspaceRegion", cfg.Rackspace.Region) } diff --git a/installer/install.go b/installer/install.go new file mode 100644 index 0000000..fe5d6bc --- /dev/null +++ b/installer/install.go @@ -0,0 +1,11 @@ +package installer + +import "io" + +type Installer interface { + // Install Defines a Method that takes a destination path + // and a io.Reader and untars and gzip decodes a tarball and + // places the files inside on the FS with `dest` as their root + // It returns the number of files written and an error + Install(dest string, r io.Reader) (int, error) +} diff --git a/commands/agent/untar.go b/installer/tar.go similarity index 52% rename from commands/agent/untar.go rename to installer/tar.go index 5a6cee9..4eaf95f 100644 --- a/commands/agent/untar.go +++ b/installer/tar.go @@ -1,62 +1,65 @@ -package agent +package installer import ( "archive/tar" "compress/gzip" + "errors" "io" - "log" "os" "path/filepath" ) -func untar(fr io.Reader, dest string) error { - gr, err := gzip.NewReader(fr) - defer gr.Close() - if err != nil && err != io.EOF { - return err +var ErrNilReader = errors.New("Install: must have a non-nil Reader") + +type TarInstaller struct{} + +func (i TarInstaller) Install(dest string, fr io.Reader) (count int, err error) { + if fr == nil { + return count, ErrNilReader } - tr := tar.NewReader(gr) + gr, err := gzip.NewReader(fr) if err != nil { - return err + return } + defer gr.Close() + + tr := tar.NewReader(gr) for { - hdr, err := tr.Next() + var hdr *tar.Header + hdr, err = tr.Next() if err == io.EOF { // end of tar archive + err = nil break } if err != nil { - log.Fatalln(err) + return } path := filepath.Join(dest, hdr.Name) info := hdr.FileInfo() if info.IsDir() { if err = os.MkdirAll(path, info.Mode()); err != nil { - panic(err) + return } continue } - file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode()) + var file *os.File + file, err = os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode()) if err != nil { - panic(err) + return } defer file.Close() _, err = io.Copy(file, tr) if err != nil { - panic(err) + return } + count++ } - log.Println("Finished Installing") - return nil -} - -func overwrite(mpath string) (*os.File, error) { - f, err := os.Create(mpath) - return f, err + return } diff --git a/installer/tar_test.go b/installer/tar_test.go new file mode 100644 index 0000000..a5ed751 --- /dev/null +++ b/installer/tar_test.go @@ -0,0 +1,79 @@ +package installer + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "io" + "testing" +) + +func TestTarInstall(t *testing.T) { + buf, err := CreateTar() + if err != nil { + t.Error(err) + } + + var badGzip bytes.Buffer + badGzip.Write([]byte("hello")) + + var badTar bytes.Buffer + gw := gzip.NewWriter(&badTar) + gw.Write([]byte("hello")) + defer gw.Close() + + cases := []struct { + tarball io.Reader + count int + err error + }{ + {&buf, 3, nil}, + {nil, 0, ErrNilReader}, + {&badGzip, 0, io.ErrUnexpectedEOF}, + {&badTar, 0, io.ErrUnexpectedEOF}, + } + + var tarInstaller TarInstaller + for _, test := range cases { + count, err := tarInstaller.Install("/tmp/", test.tarball) + if err != test.err { + t.Errorf("Install: Expected error to equal %v got: %v", test.err, err) + } + + if count != test.count { + t.Errorf("Install: Expected % files to be installed got %v", test.count, count) + } + } +} + +func CreateTar() (buf bytes.Buffer, err error) { + // Create a new tar archive. + gw := gzip.NewWriter(&buf) + defer gw.Close() + tw := tar.NewWriter(gw) + defer tw.Close() + + // Add some files to the archive. + var files = []struct { + Name, Body string + }{ + {"readme.txt", "This archive contains some text files."}, + {"gopher.txt", "Gopher names:\nGeorge\nGeoffrey\nGonzo"}, + {"todo.txt", "Get animal handling licence."}, + } + for _, file := range files { + hdr := &tar.Header{ + Name: file.Name, + Mode: 0600, + Size: int64(len(file.Body)), + } + if err = tw.WriteHeader(hdr); err != nil { + return + } + if _, err = tw.Write([]byte(file.Body)); err != nil { + return + } + } + + return +} diff --git a/lock/consul.go b/lock/consul.go new file mode 100644 index 0000000..0cbf7bc --- /dev/null +++ b/lock/consul.go @@ -0,0 +1,40 @@ +package lock + +import ( + "os" + + "github.com/hashicorp/consul/api" +) + +type ConsulLocker struct { + semaphore *api.Semaphore +} + +func NewConsulLocker(prefix string, config *api.Config) (*ConsulLocker, error) { + client, err := api.NewClient(config) + if err != nil { + return nil, err + } + name, _ := os.Hostname() + s, err := client.SemaphoreOpts(&api.SemaphoreOptions{ + Prefix: "dropship/services/", + Limit: 1, + + SessionName: name, + }) + if err != nil { + return nil, err + } + + l := &ConsulLocker{s} + + return l, nil +} + +func (l *ConsulLocker) Acquire(shutdownCh <-chan struct{}) (<-chan struct{}, error) { + return l.semaphore.Acquire(shutdownCh) +} + +func (l *ConsulLocker) Release() error { + return l.semaphore.Release() +} diff --git a/lock/consul_test.go b/lock/consul_test.go new file mode 100644 index 0000000..ea332f9 --- /dev/null +++ b/lock/consul_test.go @@ -0,0 +1,51 @@ +package lock + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/api" +) + +var ( + locker Locker + err error +) + +func TestMain(t *testing.T) { + locker, err = NewConsulLocker("dropship/services", api.DefaultConfig()) + if err != nil { + t.Error(err) + t.Fail() + } +} + +func TestConsulLockerAcquire(t *testing.T) { + lock, err := locker.Acquire(nil) + if err != nil { + t.Fatal(err) + return + } + if lock == nil { + t.Fatalf("Acquire: expected channel signal got: %v", lock) + return + } + + select { + case <-lock: + t.Fatal("Acquire: should be held") + default: + } + + err = locker.Release() + if err != nil { + t.Fatal(err) + } + + // Should lose resource + select { + case <-lock: + case <-time.After(time.Second): + t.Fatalf("Acquire: should not be held") + } +} diff --git a/lock/locker.go b/lock/locker.go new file mode 100644 index 0000000..9c1eb83 --- /dev/null +++ b/lock/locker.go @@ -0,0 +1,6 @@ +package lock + +type Locker interface { + Acquire(<-chan struct{}) (<-chan struct{}, error) + Release() error +} diff --git a/repo/rackspace/repo.go b/repo/rackspace/repo.go deleted file mode 100644 index 77ee510..0000000 --- a/repo/rackspace/repo.go +++ /dev/null @@ -1,81 +0,0 @@ -package rackspace - -import ( - "io" - "log" - - "github.com/ChrisMcKenzie/dropship/repo" - "github.com/ChrisMcKenzie/dropship/structs" - "github.com/ncw/swift" -) - -const ( - RepoName = "rackspace" - AUTH_URL = "https://identity.api.rackspacecloud.com/v2.0" -) - -type RackspaceRepo struct { - connection *swift.Connection -} - -func Setup(user, key, region string) { - if user == "" || key == "" { - return - } - - rackConnection := &swift.Connection{ - // This should be your username - UserName: user, - // This should be your api key - ApiKey: key, - // This should be a v1 auth url, eg - // Rackspace US https://auth.api.rackspacecloud.com/v1.0 - // Rackspace UK https://lon.auth.api.rackspacecloud.com/v1.0 - // Memset Memstore UK https://auth.storage.memset.com/v1.0 - AuthUrl: AUTH_URL, - // Region to use - default is use first region if unset - Region: region, - // Name of the tenant - this is likely your username - } - - repo.Register(&RackspaceRepo{rackConnection}) -} - -func (repo *RackspaceRepo) GetName() string { - return RepoName -} - -func (repo *RackspaceRepo) IsUpdated(s structs.Service) (bool, error) { - if !repo.connection.Authenticated() { - err := repo.connection.Authenticate() - if err != nil { - log.Fatal(err) - } - } - - info, _, err := repo.connection.Object( - s.Artifact.Bucket, - s.Artifact.Path, - ) - - if err != nil { - return true, err - } - - if info.Hash == s.Hash { - return true, nil - } - - return false, nil -} - -func (r *RackspaceRepo) Download(s structs.Service) (io.Reader, repo.MetaData, error) { - log.Println("Downloading", s.Artifact.Path, "from", s.Artifact.Bucket) - file, hdrs, err := r.connection.ObjectOpen( - s.Artifact.Bucket, - s.Artifact.Path, - true, - swift.Headers{}, - ) - return file, repo.MetaData{hdrs["Etag"], hdrs["Content-Type"]}, err -} diff --git a/repo/repo.go b/repo/repo.go deleted file mode 100644 index d5f3131..0000000 --- a/repo/repo.go +++ /dev/null @@ -1,36 +0,0 @@ -package repo - -import ( - "io" - - "github.com/ChrisMcKenzie/dropship/structs" -) - -type ( - Repo interface { - GetName() string - IsUpdated(structs.Service) (bool, error) - Download(structs.Service) (io.Reader, MetaData, error) - } - - MetaData struct { - Hash string - ContentType string - } -) - -var repos []Repo - -func Register(r Repo) { - repos = append(repos, r) -} - -func GetRepo(name string) Repo { - for _, repo := range repos { - if repo.GetName() == name { - return repo - } - } - - return nil -} diff --git a/service/runner.go b/service/runner.go new file mode 100644 index 0000000..01eb3cf --- /dev/null +++ b/service/runner.go @@ -0,0 +1,39 @@ +package service + +import "sync" + +type Worker interface { + Work() +} + +type Runner struct { + work chan Worker + wg sync.WaitGroup +} + +func NewRunner(maxGoRoutines int) *Runner { + r := Runner{ + work: make(chan Worker), + } + + r.wg.Add(maxGoRoutines) + for i := 0; i < maxGoRoutines; i++ { + go func() { + for w := range r.work { + w.Work() + } + r.wg.Done() + }() + } + + return &r +} + +func (r *Runner) Do(w Worker) { + r.work <- w +} + +func (r *Runner) Shutdown() { + close(r.work) + r.wg.Wait() +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..382aa83 --- /dev/null +++ b/service/service.go @@ -0,0 +1,151 @@ +package service + +import ( + "errors" + "log" + "os/exec" + "strings" + "time" + + "github.com/ChrisMcKenzie/dropship/installer" + "github.com/ChrisMcKenzie/dropship/lock" + "github.com/ChrisMcKenzie/dropship/updater" + "github.com/hashicorp/consul/api" + "github.com/spf13/viper" +) + +type Artifact struct { + Type string `hcl:",key"` + Bucket string `hcl:"bucket"` + Path string `hcl:"path"` + Destination string `hcl:"destination"` +} + +type Config struct { + Name string `hcl:",key"` + CheckInterval string `hcl:"checkInterval"` + PostCommand string `hcl:postCommand` + Sequential bool `hcl:"sequentialUpdates"` + Artifact Artifact `hcl:"artifact,expand"` +} + +type Dispatcher struct { + config Config + ticker *time.Ticker + task *Runner + hash string + shutdownCh <-chan struct{} +} + +func NewDispatcher(cfg Config, t *Runner, shutdownCh <-chan struct{}) (*Dispatcher, error) { + w := Dispatcher{ + config: cfg, + task: t, + shutdownCh: shutdownCh, + } + + dur, err := time.ParseDuration(cfg.CheckInterval) + if err != nil { + return nil, err + } + w.ticker = time.NewTicker(dur) + + go w.start() + + return &w, nil +} + +func (w *Dispatcher) start() { + for { + select { + case <-w.ticker.C: + w.task.Do(w) + case _, ok := <-w.shutdownCh: + if !ok { + log.Printf("Shutting down dispatcher for %s", w.config.Name) + w.ticker.Stop() + return + } + } + } +} + +func (w *Dispatcher) Work() { + log.Printf("[INF]: Starting Update check for %s...", w.config.Name) + user := viper.GetString("rackspaceUser") + key := viper.GetString("rackspaceKey") + region := viper.GetString("rackspaceRegion") + + u := updater.NewRackspaceUpdater(user, key, region) + opts := &updater.Options{w.config.Artifact.Bucket, w.config.Artifact.Path} + + isOutOfDate, err := u.IsOutdated(w.hash, opts) + if err != nil { + log.Printf("[ERR]: Unable to check updates for %s %v", w.config.Name, err) + return + } + + if isOutOfDate { + if w.config.Sequential { + log.Printf("[INF]: Acquiring lock for %s", w.config.Name) + l, err := lock.NewConsulLocker("dropship/services/"+w.config.Name, api.DefaultConfig()) + if err != nil { + log.Printf("[ERR]: Unable to retreive update lock. %v", err) + return + } + _, err = l.Acquire(w.shutdownCh) + if err != nil { + log.Printf("[ERR]: Unable to retreive update lock. %v", err) + return + } + defer l.Release() + } + + log.Printf("[INF]: Installing update for %s...", w.config.Name) + fr, meta, err := u.Download(opts) + if err != nil { + log.Printf("[ERR]: Unable to download update for %s %v", w.config.Name, err) + return + } + + i, err := getInstaller(meta.ContentType) + if err != nil { + log.Printf("[ERR]: %s for %s", w.config.Name, err) + return + } + + filesWritten, err := i.Install(w.config.Artifact.Destination, fr) + if err != nil { + log.Printf("[ERR]: Unable to install update for %s %s", w.config.Name, err) + } + + if w.config.PostCommand != "" { + res, err := executeCommand(w.config.PostCommand) + if err != nil { + log.Printf("[ERR]: Unable to execute postComment. %v", err) + } + log.Printf("[INF]: postCommand executed successfully. %v", res) + } + + log.Printf("[INF]: Update for %s installed successfully. [hash: %s] [files written: %d]", w.config.Name, meta.Hash, filesWritten) + w.hash = meta.Hash + } else { + log.Printf("[INF]: %s is up to date", w.config.Name) + } +} + +func executeCommand(c string) (string, error) { + cmd := strings.Fields(c) + out, err := exec.Command(cmd[0], cmd[1:]...).Output() + return string(out), err +} + +func getInstaller(contentType string) (installer.Installer, error) { + switch contentType { + case "application/x-gzip": + var installer installer.TarInstaller + return installer, nil + } + + return nil, errors.New("Unable to determine installation method from file type") +} diff --git a/structs/deployment.go b/structs/deployment.go deleted file mode 100644 index e5c7586..0000000 --- a/structs/deployment.go +++ /dev/null @@ -1,23 +0,0 @@ -package structs - -type Deployment struct { - Services []Service `hcl:"service,expand"` -} - -type Service struct { - Id string `hcl:",key"` - Name string `hcl:"name"` - SequentialUpdate bool `hcl:"sequentialUpdate"` - CheckInterval string `hcl:"checkInterval"` - Command string `hcl:"command"` - Artifact ArtifactConfig `hcl:"artifact,expand"` - Hash string `hcl:"-"` -} - -type ArtifactConfig struct { - Repo string `hcl:",key"` - Bucket string `hcl:"bucket"` - Type string `hcl:"type"` - Path string `hcl:"path"` - Dest string `hcl:"destination"` -} diff --git a/updater/config.go b/updater/config.go new file mode 100644 index 0000000..6fa29a7 --- /dev/null +++ b/updater/config.go @@ -0,0 +1,12 @@ +package updater + +type ( + Options struct { + Bucket string + Path string + } + MetaData struct { + ContentType string + Hash string + } +) diff --git a/updater/rackspace.go b/updater/rackspace.go new file mode 100644 index 0000000..0862350 --- /dev/null +++ b/updater/rackspace.go @@ -0,0 +1,69 @@ +package updater + +import ( + "errors" + "io" + + "github.com/ncw/swift" +) + +var ( + ErrUnableToConnect = errors.New("RackspaceUpdater: unable to connect to rackspace.") +) + +type RackspaceUpdater struct { + conn *swift.Connection +} + +func NewRackspaceUpdater(user, key, region string) *RackspaceUpdater { + return &RackspaceUpdater{ + conn: &swift.Connection{ + // This should be your username + UserName: user, + // This should be your api key + ApiKey: key, + // This should be a v1 auth url, eg + // Rackspace US https://auth.api.rackspacecloud.com/v1.0 + // Rackspace UK https://lon.auth.api.rackspacecloud.com/v1.0 + // Memset Memstore UK https://auth.storage.memset.com/v1.0 + AuthUrl: "https://auth.api.rackspacecloud.com/v1.0", + // Region to use - default is use first region if unset + Region: region, + // Name of the tenant - this is likely your username + }, + } +} + +func (u *RackspaceUpdater) IsOutdated(hash string, opts *Options) (bool, error) { + if u.conn == nil { + return false, ErrUnableToConnect + } + + info, _, err := u.conn.Object(opts.Bucket, opts.Path) + if err != nil { + return false, err + } + + if info.Hash == hash { + return false, nil + } + + return true, nil +} + +func (u *RackspaceUpdater) Download(opt *Options) (io.ReadCloser, MetaData, error) { + var meta MetaData + if u.conn == nil { + return nil, meta, ErrUnableToConnect + } + + r, hdrs, err := u.conn.ObjectOpen(opt.Bucket, opt.Path, true, swift.Headers{}) + if err != nil { + return nil, meta, err + } + + meta.Hash = hdrs["Etag"] + meta.ContentType = hdrs["Content-Type"] + + return r, meta, nil +} diff --git a/updater/rackspace_test.go b/updater/rackspace_test.go new file mode 100644 index 0000000..859c60d --- /dev/null +++ b/updater/rackspace_test.go @@ -0,0 +1,173 @@ +package updater + +import ( + "bytes" + "fmt" + "io" + "os" + "testing" + + "github.com/ncw/swift" +) + +var ( + user, key, region string + content string = "hello world\r\n" + hash string = "a0f2a3c1dcd5b1cac71bf0c03f2ff1bd" + conn *swift.Connection + updater *RackspaceUpdater +) + +const Container = "test-container" + +func setup() error { + user = os.Getenv("RACKSPACE_USER") + key = os.Getenv("RACKSPACE_KEY") + region = os.Getenv("RACKSPACE_REGION") + if user == "" || key == "" || region == "" { + return fmt.Errorf("user or key are required") + } + + conn = &swift.Connection{ + // This should be your username + UserName: user, + // This should be your api key + ApiKey: key, + // This should be a v1 auth url, eg + // Rackspace US https://auth.api.rackspacecloud.com/v1.0 + // Rackspace UK https://lon.auth.api.rackspacecloud.com/v1.0 + // Memset Memstore UK https://auth.storage.memset.com/v1.0 + AuthUrl: "https://auth.api.rackspacecloud.com/v1.0", + // Region to use - default is use first region if unset + Region: region, + // Name of the tenant - this is likely your username + } + + // setup test container + err := conn.ContainerCreate(Container, nil) + if err != nil { + return err + } + + obj, err := conn.ObjectCreate(Container, "test.txt", false, "", "text/plain", nil) + if err != nil { + return err + } + + _, err = obj.Write([]byte(content)) + + err = obj.Close() + + return err +} + +func TestMain(t *testing.T) { + err := setup() + if err != nil { + t.Error(err) + t.Fail() + } + + updater = NewRackspaceUpdater(user, key, region) +} + +func TestRackspaceUpdaterIsOutdated(t *testing.T) { + cases := []struct { + hash string + expected bool + }{ + { + hash: "", + expected: true, + }, + { + hash: hash, + expected: false, + }, + { + hash: "this hash is random!", + expected: true, + }, + } + + for _, test := range cases { + result, err := updater.IsOutdated(test.hash, &Options{Container, "test.txt"}) + if err != nil { + t.Error(err) + } + + if result != test.expected { + t.Error(fmt.Errorf("IsOutdated:Test hash %v Expected %t got %t", test.hash, test.expected, result)) + } + } + + // negative non-existing file error + _, err := updater.IsOutdated("", &Options{Container, "no-file.txt"}) + if err == nil { + t.Error("IsOutdated: Expected an error when accessing non-existent file.") + } + + // negative generic error testing. + badUpdater := &RackspaceUpdater{} + _, err = badUpdater.IsOutdated("", &Options{}) + if err == nil { + t.Error("IsOutdated: Expected an error with invalid credentials.") + } +} + +func TestRackspaceUpdaterDownload(t *testing.T) { + cases := []struct { + Options *Options + Value string + MetaData MetaData + Error error + }{ + { + Options: &Options{ + Bucket: Container, + Path: "test.txt", + }, + MetaData: MetaData{"text/plain", hash}, + Error: nil, + Value: content, + }, + { + Options: &Options{ + Bucket: Container, + Path: "no-file.txt", + }, + Error: swift.ObjectNotFound, + Value: "", + }, + } + + for _, test := range cases { + r, meta, err := updater.Download(test.Options) + if err != test.Error { + t.Errorf("Download: Expected error to be %v got: %v", test.Error, err) + } + + if meta != test.MetaData { + t.Errorf("Download: Expected MetaData to be %v got: %v", test.MetaData, meta) + } + + if r != nil { + var data bytes.Buffer + _, err = io.Copy(&data, r) + if err != nil { + t.Error(err) + } + + if data.String() != test.Value { + t.Errorf("Download: Expected io.Reader to contain %v got: %v", test.Value, data.String()) + } + } + } + + // negative generic error testing. + badUpdater := &RackspaceUpdater{} + _, _, err := badUpdater.Download(&Options{}) + if err == nil { + t.Error("IsOutdated: Expected an error with invalid credentials.") + } +} diff --git a/updater/update.go b/updater/update.go new file mode 100644 index 0000000..998a2af --- /dev/null +++ b/updater/update.go @@ -0,0 +1,8 @@ +package updater + +import "io" + +type Updater interface { + CheckForUpdate(hash string, opts *Options) (bool, error) + Update(*Options) (io.Reader, MetaData, error) +}