From 924074139a086aec7f12572d05909ee0b54e21f5 Mon Sep 17 00:00:00 2001 From: Eric Schmidt Date: Wed, 26 Oct 2022 12:09:48 -0700 Subject: [PATCH] feat(datastore): adds snapshot reads (#6755) * feat(datastore): adds snapshot reads --- datastore/datastore.go | 71 ++++++++++++++++--- datastore/datastore_test.go | 125 +++++++++++++++++++++++++++++++--- datastore/integration_test.go | 31 ++++++++- datastore/transaction.go | 27 +++++++- datastore/transaction_test.go | 13 ++++ 5 files changed, 248 insertions(+), 19 deletions(-) diff --git a/datastore/datastore.go b/datastore/datastore.go index a6ee9db61bcc..2f5a1d609a58 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -21,6 +21,7 @@ import ( "log" "os" "reflect" + "time" "cloud.google.com/go/internal/trace" "google.golang.org/api/option" @@ -28,6 +29,7 @@ import ( gtransport "google.golang.org/api/transport/grpc" pb "google.golang.org/genproto/googleapis/datastore/v1" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -53,9 +55,10 @@ const resourcePrefixHeader = "google-cloud-resource-prefix" // Client is a client for reading and writing data in a datastore dataset. type Client struct { - connPool gtransport.ConnPool - client pb.DatastoreClient - dataset string // Called dataset by the datastore API, synonym for project ID. + connPool gtransport.ConnPool + client pb.DatastoreClient + dataset string // Called dataset by the datastore API, synonym for project ID. + readSettings *readSettings } // NewClient creates a new Client for a given dataset. If the project ID is @@ -118,9 +121,10 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio return nil, fmt.Errorf("dialing: %w", err) } return &Client{ - connPool: connPool, - client: newDatastoreClient(connPool, projectID), - dataset: projectID, + connPool: connPool, + client: newDatastoreClient(connPool, projectID), + dataset: projectID, + readSettings: &readSettings{}, }, nil } @@ -348,7 +352,17 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error) if dst == nil { // get catches nil interfaces; we need to catch nil ptr here return ErrInvalidEntityType } - err = c.get(ctx, []*Key{key}, []interface{}{dst}, nil) + + var opts *pb.ReadOptions + if !c.readSettings.readTime.IsZero() { + opts = &pb.ReadOptions{ + ConsistencyType: &pb.ReadOptions_ReadTime{ + ReadTime: timestamppb.New(c.readSettings.readTime), + }, + } + } + + err = c.get(ctx, []*Key{key}, []interface{}{dst}, opts) if me, ok := err.(MultiError); ok { return me[0] } @@ -371,7 +385,16 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.GetMulti") defer func() { trace.EndSpan(ctx, err) }() - return c.get(ctx, keys, dst, nil) + var opts *pb.ReadOptions + if c.readSettings != nil && !c.readSettings.readTime.IsZero() { + opts = &pb.ReadOptions{ + ConsistencyType: &pb.ReadOptions_ReadTime{ + ReadTime: timestamppb.New(c.readSettings.readTime), + }, + } + } + + return c.get(ctx, keys, dst, opts) } func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb.ReadOptions) error { @@ -679,3 +702,35 @@ func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) (ret []*Key, err } return ret, nil } + +// ReadTime specifies a snapshot (time) of the database to read. +func ReadTime(t time.Time) ReadOption { + return docReadTime(t) +} + +type docReadTime time.Time + +func (drt docReadTime) apply(rs *readSettings) { + rs.readTime = time.Time(drt) +} + +// ReadOption provides specific instructions for how to access documents in the database. +// Currently, only ReadTime is supported. +type ReadOption interface { + apply(*readSettings) +} + +type readSettings struct { + readTime time.Time +} + +// WithReadOptions specifies constraints for accessing documents from the database, +// e.g. at what time snapshot to read the documents. +// The client uses this value for subsequent reads, unless additional ReadOptions +// are provided. +func (c *Client) WithReadOptions(ro ...ReadOption) *Client { + for _, r := range ro { + r.apply(c.readSettings) + } + return c +} diff --git a/datastore/datastore_test.go b/datastore/datastore_test.go index ff6cbf8b6680..2671fa686bce 100644 --- a/datastore/datastore_test.go +++ b/datastore/datastore_test.go @@ -17,9 +17,11 @@ package datastore import ( "context" "errors" + "fmt" "sort" "strings" "testing" + "time" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" @@ -292,6 +294,110 @@ func TestPutMultiTypes(t *testing.T) { } } +func TestGetWithReadTime(t *testing.T) { + type ent struct { + A int + } + tm := time.Now() + k := NameKey("testKind", "testReadTime", nil) + e := &pb.Entity{ + Key: keyToProto(k), + Properties: map[string]*pb.Value{ + "A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}}, + }, + } + fakeClient := &fakeDatastoreClient{ + lookup: func(req *pb.LookupRequest) (*pb.LookupResponse, error) { + if !req.ReadOptions.GetReadTime().AsTime().Equal(tm) { + return nil, fmt.Errorf("read time mismatch: expected %v, got %v", tm, + req.ReadOptions.GetReadTime()) + } + + return &pb.LookupResponse{ + Found: []*pb.EntityResult{ + { + Entity: e, + Version: 1, + }, + }, + }, nil + }, + } + + client := &Client{ + client: fakeClient, + readSettings: &readSettings{}, + } + + ctx := context.Background() + client.WithReadOptions(ReadTime(tm)) + dst := &ent{} + err := client.Get(ctx, k, dst) + if err != nil { + t.Fatalf("Get() with ReadTime failed: %v\n", err) + } +} + +func TestGetMultiWithReadTime(t *testing.T) { + type ent struct { + A int + } + + tm := time.Now() + k := []*Key{ + NameKey("testKind", "testReadTime", nil), + NameKey("testKind", "testReadTime2", nil), + } + + e := &pb.Entity{ + Key: keyToProto(k[0]), + Properties: map[string]*pb.Value{ + "A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}}, + }, + } + e2 := &pb.Entity{ + Key: keyToProto(k[1]), + Properties: map[string]*pb.Value{ + "A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}}, + }, + } + + fakeClient := &fakeDatastoreClient{ + lookup: func(req *pb.LookupRequest) (*pb.LookupResponse, error) { + + if !req.ReadOptions.GetReadTime().AsTime().Equal(tm) { + return nil, fmt.Errorf("read time mismatch: expected %v, got %v", tm, + req.ReadOptions.GetReadTime()) + } + + return &pb.LookupResponse{ + Found: []*pb.EntityResult{ + { + Entity: e, + Version: 1, + }, { + Entity: e2, + Version: 1, + }, + }, + }, nil + }, + } + + client := &Client{ + client: fakeClient, + readSettings: &readSettings{}, + } + + ctx := context.Background() + client.WithReadOptions(ReadTime(tm)) + dst := make([]*ent, len(k)) + err := client.GetMulti(ctx, k, dst) + if err != nil { + t.Fatalf("Get() with ReadTime failed: %v\n", err) + } +} + func TestNoIndexOnSliceProperties(t *testing.T) { // Check that ExcludeFromIndexes is set on the inner elements, // rather than the top-level ArrayValue value. @@ -500,7 +606,8 @@ func TestDeferred(t *testing.T) { }, } client := &Client{ - client: fakeClient, + client: fakeClient, + readSettings: &readSettings{}, } ctx := context.Background() @@ -620,7 +727,7 @@ func TestDeferredMissing(t *testing.T) { } func TestGetWithNilKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} err := client.Get(context.Background(), nil, []Property{}) if err != ErrInvalidKey { t.Fatalf("want ErrInvalidKey, got %v", err) @@ -628,7 +735,7 @@ func TestGetWithNilKey(t *testing.T) { } func TestGetMultiWithNilKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} dest := make([]PropertyList, 1) err := client.GetMulti(context.Background(), []*Key{nil}, dest) if me, ok := err.(MultiError); !ok { @@ -639,7 +746,7 @@ func TestGetMultiWithNilKey(t *testing.T) { } func TestGetWithIncompleteKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} err := client.Get(context.Background(), &Key{Kind: "testKind"}, []Property{}) if err == nil { t.Fatalf("want err, got nil") @@ -647,7 +754,7 @@ func TestGetWithIncompleteKey(t *testing.T) { } func TestGetMultiWithIncompleteKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} dest := make([]PropertyList, 1) err := client.GetMulti(context.Background(), []*Key{{Kind: "testKind"}}, dest) if me, ok := err.(MultiError); !ok { @@ -658,7 +765,7 @@ func TestGetMultiWithIncompleteKey(t *testing.T) { } func TestDeleteWithNilKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} err := client.Delete(context.Background(), nil) if err != ErrInvalidKey { t.Fatalf("want ErrInvalidKey, got %v", err) @@ -666,7 +773,7 @@ func TestDeleteWithNilKey(t *testing.T) { } func TestDeleteMultiWithNilKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} err := client.DeleteMulti(context.Background(), []*Key{nil}) if me, ok := err.(MultiError); !ok { t.Fatalf("want MultiError, got %v", err) @@ -676,7 +783,7 @@ func TestDeleteMultiWithNilKey(t *testing.T) { } func TestDeleteWithIncompleteKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} err := client.Delete(context.Background(), &Key{Kind: "testKind"}) if err == nil { t.Fatalf("want err, got nil") @@ -684,7 +791,7 @@ func TestDeleteWithIncompleteKey(t *testing.T) { } func TestDeleteMultiWithIncompleteKey(t *testing.T) { - client := &Client{} + client := &Client{readSettings: &readSettings{}} err := client.DeleteMulti(context.Background(), []*Key{{Kind: "testKind"}}) if me, ok := err.(MultiError); !ok { t.Fatalf("want MultiError, got %v", err) diff --git a/datastore/integration_test.go b/datastore/integration_test.go index 49e702f38cf0..46c414c83ce3 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -198,6 +198,34 @@ func TestIntegration_Basics(t *testing.T) { } } +func TestIntegration_GetWithReadTime(t *testing.T) { + ctx, _ := context.WithTimeout(context.Background(), time.Second*20) + client := newTestClient(ctx, t) + defer client.Close() + + type X struct { + I int + S string + T time.Time + U interface{} + } + + x0 := X{66, "99", timeNow.Truncate(time.Millisecond), "X"} + k, err := client.Put(ctx, IncompleteKey("BasicsX", nil), &x0) + if err != nil { + t.Fatalf("client.Put: %v", err) + } + x1 := X{} + client.WithReadOptions(ReadTime(time.Now())) + err = client.Get(ctx, k, &x1) + if err != nil { + t.Errorf("client.Get: %v", err) + } + + // Cleanup + _ = client.Delete(ctx, k) +} + func TestIntegration_TopLevelKeyLoaded(t *testing.T) { ctx, _ := context.WithTimeout(context.Background(), time.Second*20) client := newTestClient(ctx, t) @@ -292,7 +320,8 @@ func TestIntegration_GetMulti(t *testing.T) { if _, err := client.PutMulti(ctx, srcKeys, src); err != nil { t.Error(err) } - err := client.GetMulti(ctx, dstKeys, dst) + + err := client.WithReadOptions(ReadTime(time.Now())).GetMulti(ctx, dstKeys, dst) if err == nil { t.Errorf("client.GetMulti got %v, expected error", err) } diff --git a/datastore/transaction.go b/datastore/transaction.go index f66209a254b7..0f2c1686afe0 100644 --- a/datastore/transaction.go +++ b/datastore/transaction.go @@ -17,11 +17,13 @@ package datastore import ( "context" "errors" + "time" "cloud.google.com/go/internal/trace" pb "google.golang.org/genproto/googleapis/datastore/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" ) // ErrConcurrentTransaction is returned when a transaction is rolled back due @@ -34,6 +36,7 @@ type transactionSettings struct { attempts int readOnly bool prevID []byte // ID of the transaction to retry + readTime *timestamppb.Timestamp } // newTransactionSettings creates a transactionSettings with a given TransactionOption slice. @@ -67,6 +70,22 @@ func (w maxAttempts) apply(s *transactionSettings) { } } +// WithReadTime returns a TransactionOption that specifies a snapshot of the +// database to view. +func WithReadTime(t time.Time) TransactionOption { + return readTime{t} +} + +type readTime struct { + time.Time +} + +func (rt readTime) apply(s *transactionSettings) { + if !rt.Time.IsZero() { + s.readTime = timestamppb.New(rt.Time) + } +} + // ReadOnly is a TransactionOption that marks the transaction as read-only. var ReadOnly TransactionOption @@ -116,9 +135,15 @@ func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction") defer func() { trace.EndSpan(ctx, err) }() + ro := &pb.TransactionOptions_ReadOnly{} + if !s.readTime.AsTime().IsZero() { + ro.ReadTime = s.readTime + } + req.TransactionOptions = &pb.TransactionOptions{ - Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}}, + Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: ro}, } + } else if s.prevID != nil { ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction") defer func() { trace.EndSpan(ctx, err) }() diff --git a/datastore/transaction_test.go b/datastore/transaction_test.go index 31d7e9fcbac4..08b66f7c00b0 100644 --- a/datastore/transaction_test.go +++ b/datastore/transaction_test.go @@ -20,6 +20,7 @@ import ( "github.com/golang/protobuf/proto" pb "google.golang.org/genproto/googleapis/datastore/v1" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestNewTransaction(t *testing.T) { @@ -36,6 +37,7 @@ func TestNewTransaction(t *testing.T) { }, } ctx := context.Background() + rt := timestamppb.Now() for _, test := range []struct { settings *transactionSettings want *pb.BeginTransactionRequest @@ -65,6 +67,17 @@ func TestNewTransaction(t *testing.T) { }, }, }, + { + &transactionSettings{readOnly: true, readTime: rt}, + &pb.BeginTransactionRequest{ + ProjectId: "project", + TransactionOptions: &pb.TransactionOptions{ + Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{ + ReadTime: rt, + }}, + }, + }, + }, } { _, err := client.newTransaction(ctx, test.settings) if err != nil {