Skip to content
Open
32 changes: 32 additions & 0 deletions pipelines/internal/commands/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ import (
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/watch"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -172,6 +173,7 @@ var (
cosChannel = flags.String("cos-channel", "", "if set, specifies the COS release channel to use")
serviceAccount = flags.String("service-account", "", "if set, specifies the service account for the VM")
outputInterval = flags.Duration("output-interval", 0, "if non-zero, specifies the time interval for logging output during runs")
pubSub = flags.Bool("pub-sub", true, "if true, attempt to use Pub/Sub to monitor the operation state")
)

func init() {
Expand Down Expand Up @@ -213,6 +215,10 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
abort := make(chan os.Signal, 1)
signal.Notify(abort, os.Interrupt)

if *pubSub {
req.PubSubTopic = pubSubTopic(ctx, req.Pipeline.Resources.ProjectId)
}

attempt := uint(1)
for {
req.Pipeline.Resources.VirtualMachine.Preemptible = (attempt <= *pvmAttempts)
Expand Down Expand Up @@ -250,6 +256,32 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
}
}

func pubSubTopic(ctx context.Context, projectID string) string {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return ""
}

topic := client.Topic("pipelines-tool")
exists, err := topic.Exists(ctx)
if err != nil {
return ""
}
if exists {
if config, err := topic.Config(ctx); err != nil || config.Labels["created-by"] != "pipelines-tool" {
return ""
}
return topic.String()
}

t, err := client.CreateTopic(ctx, "pipelines-tool")
if err != nil {
return ""
}
t.Update(ctx, pubsub.TopicConfigToUpdate{Labels: map[string]string{"created-by": "pipelines-tool"}})
return t.String()
}

func parseJSON(filename string, v interface{}) error {
f, err := os.Open(filename)
if err != nil {
Expand Down
138 changes: 117 additions & 21 deletions pipelines/internal/commands/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ package watch

import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"flag"
"fmt"
"strings"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
genomics "google.golang.org/api/genomics/v2alpha1"
)
Expand All @@ -41,7 +46,7 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
}

name := common.ExpandOperationName(project, names[0])
result, err := watch(ctx, service, name)
result, err := watch(ctx, service, project, name)
if err != nil {
return fmt.Errorf("watching pipeline: %v", err)
}
Expand All @@ -54,28 +59,35 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
return nil
}

func watch(ctx context.Context, service *genomics.Service, name string) (interface{}, error) {
func watch(ctx context.Context, service *genomics.Service, project, name string) (interface{}, error) {
lro, err := service.Projects.Operations.Get(name).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("getting operation status: %v", err)
}

var metadata genomics.Metadata
if err := json.Unmarshal(lro.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("parsing metadata: %v", err)
}

if *actions {
encoded, err := json.MarshalIndent(metadata.Pipeline.Actions, "", " ")
if err != nil {
return nil, fmt.Errorf("encoding actions: %v", err)
}
fmt.Printf("%s\n", encoded)
}

var events []*genomics.Event
const initialDelay = 5 * time.Second
delay := initialDelay
for {
scan := func(ctx context.Context) (bool, interface{}, error) {
lro, err := service.Projects.Operations.Get(name).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("getting operation status: %v", err)
return false, nil, fmt.Errorf("getting operation status: %v", err)
}

var metadata genomics.Metadata
if err := json.Unmarshal(lro.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("parsing metadata: %v", err)
}

if *actions {
*actions = false
encoded, err := json.MarshalIndent(metadata.Pipeline.Actions, "", " ")
if err != nil {
return nil, fmt.Errorf("encoding actions: %v", err)
}
fmt.Printf("%s\n", encoded)
return false, nil, fmt.Errorf("parsing metadata: %v", err)
}

if len(events) != len(metadata.Events) {
Expand All @@ -88,20 +100,104 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa
}
}
events = metadata.Events
delay = initialDelay
}

if lro.Done {
if lro.Error != nil {
return true, lro.Error, nil
}
return true, lro.Response, nil
}
return false, nil, nil
}

if metadata.PubSubTopic != "" {
sub, err := newPubSubSubscription(ctx, project, metadata.PubSubTopic)
if err != nil {
return nil, fmt.Errorf("creating Pub/Sub subscription: %v", err)
}
defer sub.Delete(ctx)

// Check if the operation finished before creating the subscription.
if done, result, err := scan(ctx); err != nil || done {
return result, err
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var response interface{}
var receiverErr error
var receiverLock sync.Mutex
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
receiverLock.Lock()
defer receiverLock.Unlock()
m.Ack()

exit := func(r interface{}, err error) {
if ctx.Err() != nil {
return
}
response = r
receiverErr = err
cancel()
}

if done, result, err := scan(ctx); err != nil || done {
exit(result, err)
}
})
if err != nil && err != context.Canceled {
return nil, fmt.Errorf("receiving message: %v", err)
}
return response, receiverErr
} else {
if lro.Done {
if lro.Error != nil {
return lro.Error, nil
}
return lro.Response, nil
}

time.Sleep(delay)
delay = time.Duration(float64(delay) * 1.5)
if limit := time.Minute; delay > limit {
delay = limit
const initialDelay = 5 * time.Second
delay := initialDelay
for {
time.Sleep(delay)
delay = time.Duration(float64(delay) * 1.5)
if limit := time.Minute; delay > limit {
delay = limit
}

if done, result, err := scan(ctx); err != nil || done {
return result, err
}
}
}
}

func newPubSubSubscription(ctx context.Context, projectID, topic string) (*pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("creating a Pub/Sub client: %v", err)
}

var id uint64
if err := binary.Read(rand.Reader, binary.LittleEndian, &id); err != nil {
return nil, fmt.Errorf("generating subscription name: %v", err)
}

el := strings.Split(topic, "/")
if len(el) < 4 {
return nil, fmt.Errorf("invalid Pub/Sub topic")
}

sub, err := client.CreateSubscription(ctx, fmt.Sprintf("s%d", id), pubsub.SubscriptionConfig{
Topic: client.TopicInProject(el[3], el[1]),
AckDeadline: 10 * time.Second,
ExpirationPolicy: 25 * time.Hour,
})
if err != nil {
return nil, fmt.Errorf("creating subscription: %v", err)
}
return sub, nil
}