Skip to content

Commit

Permalink
pubsub: support more fields for Subscription.Update
Browse files Browse the repository at this point in the history
Change-Id: Ia773a02286aa2060b631e66ffd54971bea55b7e7
Reviewed-on: https://code-review.googlesource.com/21230
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
  • Loading branch information
jba committed Dec 19, 2017
1 parent 26c51a4 commit 8213d75
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 66 deletions.
103 changes: 98 additions & 5 deletions pubsub/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ import (
"time"

"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
durpb "github.com/golang/protobuf/ptypes/duration"
emptypb "github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

type fakeServer struct {
Expand All @@ -39,6 +45,7 @@ type fakeServer struct {
Deadlines map[string]int32 // deadlines by message ID
pullResponses []*pullResponse
wg sync.WaitGroup
subs map[string]*pb.Subscription
}

type pullResponse struct {
Expand All @@ -55,6 +62,7 @@ func newFakeServer() (*fakeServer, error) {
Addr: srv.Addr,
Acked: map[string]bool{},
Deadlines: map[string]int32{},
subs: map[string]*pb.Subscription{},
}
pb.RegisterPublisherServer(srv.Gsrv, fake)
pb.RegisterSubscriberServer(srv.Gsrv, fake)
Expand Down Expand Up @@ -138,10 +146,95 @@ func (s *fakeServer) StreamingPull(stream pb.Subscriber_StreamingPullServer) err
}
}

const (
minMessageRetentionDuration = 10 * time.Minute
maxMessageRetentionDuration = 168 * time.Hour
)

var defaultMessageRetentionDuration = ptypes.DurationProto(maxMessageRetentionDuration)

func checkMRD(pmrd *durpb.Duration) error {
mrd, err := ptypes.Duration(pmrd)
if err != nil || mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration {
return grpc.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd)
}
return nil
}

func checkAckDeadline(ads int32) error {
if ads < 10 || ads > 600 {
// PubSub service returns Unknown.
return grpc.Errorf(codes.Unknown, "bad ack_deadline_seconds: %d", ads)
}
return nil
}

func (s *fakeServer) CreateSubscription(ctx context.Context, sub *pb.Subscription) (*pb.Subscription, error) {
if s.subs[sub.Name] != nil {
return nil, grpc.Errorf(codes.AlreadyExists, "subscription %q", sub.Name)
}
sub2 := proto.Clone(sub).(*pb.Subscription)
if err := checkAckDeadline(sub.AckDeadlineSeconds); err != nil {
return nil, err
}
if sub.MessageRetentionDuration == nil {
sub2.MessageRetentionDuration = defaultMessageRetentionDuration
}
if err := checkMRD(sub2.MessageRetentionDuration); err != nil {
return nil, err
}
if sub.PushConfig == nil {
sub2.PushConfig = &pb.PushConfig{}
}
s.subs[sub.Name] = sub2
return sub2, nil
}

func (s *fakeServer) GetSubscription(ctx context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error) {
return &pb.Subscription{
Name: req.Subscription,
AckDeadlineSeconds: 10,
PushConfig: &pb.PushConfig{},
}, nil
if sub := s.subs[req.Subscription]; sub != nil {
return sub, nil
}
return nil, grpc.Errorf(codes.NotFound, "subscription %q", req.Subscription)
}

func (s *fakeServer) UpdateSubscription(ctx context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error) {
sub := s.subs[req.Subscription.Name]
if sub == nil {
return nil, grpc.Errorf(codes.NotFound, "subscription %q", req.Subscription.Name)
}
for _, path := range req.UpdateMask.Paths {
switch path {
case "push_config":
sub.PushConfig = req.Subscription.PushConfig

case "ack_deadline_seconds":
a := req.Subscription.AckDeadlineSeconds
if err := checkAckDeadline(a); err != nil {
return nil, err
}
sub.AckDeadlineSeconds = a

case "retain_acked_messages":
sub.RetainAckedMessages = req.Subscription.RetainAckedMessages

case "message_retention_duration":
if err := checkMRD(req.Subscription.MessageRetentionDuration); err != nil {
return nil, err
}
sub.MessageRetentionDuration = req.Subscription.MessageRetentionDuration

// TODO(jba): labels
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
}
return sub, nil
}

func (s *fakeServer) DeleteSubscription(ctx context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) {
if _, ok := s.subs[req.Subscription]; !ok {
return nil, grpc.Errorf(codes.NotFound, "subscription %q", req.Subscription)
}
delete(s.subs, req.Subscription)
return &emptypb.Empty{}, nil
}
116 changes: 101 additions & 15 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"cloud.google.com/go/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

var (
Expand Down Expand Up @@ -295,40 +297,62 @@ func TestSubscriptionUpdate(t *testing.T) {
}
defer sub.Delete(ctx)

sc, err := sub.Config(ctx)
got, err := sub.Config(ctx)
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(sc.PushConfig, PushConfig{}) {
t.Fatalf("got %+v, want empty PushConfig", sc.PushConfig)
want := SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
RetainAckedMessages: false,
RetentionDuration: defaultRetentionDuration,
}
// Add a PushConfig.
if !testutil.Equal(got, want) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
}
// Add a PushConfig and change other fields.
projID := testutil.ProjID()
pc := PushConfig{
Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
Attributes: map[string]string{"x-goog-version": "v1"},
}
sc, err = sub.Update(ctx, SubscriptionConfigToUpdate{PushConfig: &pc})
got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
PushConfig: &pc,
AckDeadline: 2 * time.Minute,
RetainAckedMessages: true,
RetentionDuration: 2 * time.Hour,
})
if err != nil {
t.Fatal(err)
}
// Despite the docs which say that Get always returns a valid "x-goog-version"
// attribute, none is returned. See
// https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PushConfig
pc.Attributes = nil
if got, want := sc.PushConfig, pc; !testutil.Equal(got, want) {
t.Fatalf("setting push config: got\n%+v\nwant\n%+v", got, want)
want = SubscriptionConfig{
Topic: topic,
PushConfig: pc,
AckDeadline: 2 * time.Minute,
RetainAckedMessages: true,
RetentionDuration: 2 * time.Hour,
}
if !testutil.Equal(got, want) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
}
// Remove the PushConfig, turning the subscription back into pull mode.
// Change AckDeadline, but nothing else.
pc = PushConfig{}
sc, err = sub.Update(ctx, SubscriptionConfigToUpdate{PushConfig: &pc})
got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
PushConfig: &pc,
AckDeadline: 30 * time.Second,
})
if err != nil {
t.Fatal(err)
}
if got, want := sc.PushConfig, pc; !testutil.Equal(got, want) {
t.Fatalf("removing push config: got\n%+v\nwant %+v", got, want)
want.PushConfig = pc
want.AckDeadline = 30 * time.Second
// service issue: PushConfig attributes are not removed.
// TODO(jba): remove when issue resolved.
want.PushConfig.Attributes = map[string]string{"x-goog-version": "v1"}
if !testutil.Equal(got, want) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
}

// If nothing changes, our client returns an error.
_, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
if err == nil {
Expand Down Expand Up @@ -360,3 +384,65 @@ func TestPublicTopic(t *testing.T) {
t.Fatal(err)
}
}

func TestIntegration_Errors(t *testing.T) {
// Test various edge conditions.
t.Parallel()
ctx := context.Background()
client := integrationTestClient(t, ctx)
defer client.Close()

topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatalf("CreateTopic error: %v", err)
}
defer topic.Stop()
defer topic.Delete(ctx)

// Out-of-range retention duration.
sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
RetentionDuration: 1 * time.Second,
})
if want := codes.InvalidArgument; grpc.Code(err) != want {
t.Errorf("got <%v>, want %s", err, want)
}
if err == nil {
sub.Delete(ctx)
}

// Negative ack deadline.
sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
AckDeadline: 5 * time.Second,
})
if want := codes.Unknown; grpc.Code(err) != want {
t.Errorf("got <%v>, want %s", err, want)
}
if err == nil {
sub.Delete(ctx)
}

// Updating a non-existent subscription.
sub = client.Subscription(subIDs.New())
_, err = sub.Update(ctx, SubscriptionConfigToUpdate{AckDeadline: 20 * time.Second})
if want := codes.NotFound; grpc.Code(err) != want {
t.Errorf("got <%v>, want %s", err, want)
}
// Deleting a non-existent subscription.
err = sub.Delete(ctx)
if want := codes.NotFound; grpc.Code(err) != want {
t.Errorf("got <%v>, want %s", err, want)
}

// Updating out-of-range retention duration.
sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic})
if err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)
_, err = sub.Update(ctx, SubscriptionConfigToUpdate{RetentionDuration: 1000 * time.Hour})
if want := codes.InvalidArgument; grpc.Code(err) != want {
t.Errorf("got <%v>, want %s", err, want)
}
}
42 changes: 5 additions & 37 deletions pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/pubsub/apiv1"
durpb "github.com/golang/protobuf/ptypes/duration"
gax "github.com/googleapis/gax-go"
"golang.org/x/net/context"
"google.golang.org/api/option"
Expand Down Expand Up @@ -115,26 +114,7 @@ func (s *apiService) close() error {
}

func (s *apiService) createSubscription(ctx context.Context, subName string, cfg SubscriptionConfig) error {
var rawPushConfig *pb.PushConfig
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 {
rawPushConfig = &pb.PushConfig{
Attributes: cfg.PushConfig.Attributes,
PushEndpoint: cfg.PushConfig.Endpoint,
}
}
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
}

_, err := s.subc.CreateSubscription(ctx, &pb.Subscription{
Name: subName,
Topic: cfg.Topic.name,
PushConfig: rawPushConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
})
_, err := s.subc.CreateSubscription(ctx, cfg.toProto(subName))
return err
}

Expand All @@ -143,23 +123,11 @@ func (s *apiService) getSubscriptionConfig(ctx context.Context, subName string)
if err != nil {
return SubscriptionConfig{}, "", err
}
var rd time.Duration
// TODO(pongad): Remove nil-check after white list is removed.
if rawSub.MessageRetentionDuration != nil {
if rd, err = ptypes.Duration(rawSub.MessageRetentionDuration); err != nil {
return SubscriptionConfig{}, "", err
}
}
sub := SubscriptionConfig{
AckDeadline: time.Second * time.Duration(rawSub.AckDeadlineSeconds),
PushConfig: PushConfig{
Endpoint: rawSub.PushConfig.PushEndpoint,
Attributes: rawSub.PushConfig.Attributes,
},
RetainAckedMessages: rawSub.RetainAckedMessages,
RetentionDuration: rd,
cfg, err := protoToSubscriptionConfig(rawSub, s)
if err != nil {
return SubscriptionConfig{}, "", err
}
return sub, rawSub.Topic, nil
return cfg, rawSub.Topic, nil
}

// stringsPage contains a list of strings and a token for fetching the next page.
Expand Down
Loading

0 comments on commit 8213d75

Please sign in to comment.