Skip to content

Commit

Permalink
feat(datastore): adds snapshot reads (#6755)
Browse files Browse the repository at this point in the history
* feat(datastore): adds snapshot reads
  • Loading branch information
telpirion authored Oct 26, 2022
1 parent 89d8695 commit 9240741
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 19 deletions.
71 changes: 63 additions & 8 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"log"
"os"
"reflect"
"time"

"cloud.google.com/go/internal/trace"
"google.golang.org/api/option"
"google.golang.org/api/transport"
gtransport "google.golang.org/api/transport/grpc"
pb "google.golang.org/genproto/googleapis/datastore/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand All @@ -53,9 +55,10 @@ const resourcePrefixHeader = "google-cloud-resource-prefix"

// Client is a client for reading and writing data in a datastore dataset.
type Client struct {
connPool gtransport.ConnPool
client pb.DatastoreClient
dataset string // Called dataset by the datastore API, synonym for project ID.
connPool gtransport.ConnPool
client pb.DatastoreClient
dataset string // Called dataset by the datastore API, synonym for project ID.
readSettings *readSettings
}

// NewClient creates a new Client for a given dataset. If the project ID is
Expand Down Expand Up @@ -118,9 +121,10 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
return nil, fmt.Errorf("dialing: %w", err)
}
return &Client{
connPool: connPool,
client: newDatastoreClient(connPool, projectID),
dataset: projectID,
connPool: connPool,
client: newDatastoreClient(connPool, projectID),
dataset: projectID,
readSettings: &readSettings{},
}, nil
}

Expand Down Expand Up @@ -348,7 +352,17 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error)
if dst == nil { // get catches nil interfaces; we need to catch nil ptr here
return ErrInvalidEntityType
}
err = c.get(ctx, []*Key{key}, []interface{}{dst}, nil)

var opts *pb.ReadOptions
if !c.readSettings.readTime.IsZero() {
opts = &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
ReadTime: timestamppb.New(c.readSettings.readTime),
},
}
}

err = c.get(ctx, []*Key{key}, []interface{}{dst}, opts)
if me, ok := err.(MultiError); ok {
return me[0]
}
Expand All @@ -371,7 +385,16 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.GetMulti")
defer func() { trace.EndSpan(ctx, err) }()

return c.get(ctx, keys, dst, nil)
var opts *pb.ReadOptions
if c.readSettings != nil && !c.readSettings.readTime.IsZero() {
opts = &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
ReadTime: timestamppb.New(c.readSettings.readTime),
},
}
}

return c.get(ctx, keys, dst, opts)
}

func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb.ReadOptions) error {
Expand Down Expand Up @@ -679,3 +702,35 @@ func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) (ret []*Key, err
}
return ret, nil
}

// ReadTime specifies a snapshot (time) of the database to read.
func ReadTime(t time.Time) ReadOption {
return docReadTime(t)
}

type docReadTime time.Time

func (drt docReadTime) apply(rs *readSettings) {
rs.readTime = time.Time(drt)
}

// ReadOption provides specific instructions for how to access documents in the database.
// Currently, only ReadTime is supported.
type ReadOption interface {
apply(*readSettings)
}

type readSettings struct {
readTime time.Time
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
// The client uses this value for subsequent reads, unless additional ReadOptions
// are provided.
func (c *Client) WithReadOptions(ro ...ReadOption) *Client {
for _, r := range ro {
r.apply(c.readSettings)
}
return c
}
125 changes: 116 additions & 9 deletions datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package datastore
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -292,6 +294,110 @@ func TestPutMultiTypes(t *testing.T) {
}
}

func TestGetWithReadTime(t *testing.T) {
type ent struct {
A int
}
tm := time.Now()
k := NameKey("testKind", "testReadTime", nil)
e := &pb.Entity{
Key: keyToProto(k),
Properties: map[string]*pb.Value{
"A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}},
},
}
fakeClient := &fakeDatastoreClient{
lookup: func(req *pb.LookupRequest) (*pb.LookupResponse, error) {
if !req.ReadOptions.GetReadTime().AsTime().Equal(tm) {
return nil, fmt.Errorf("read time mismatch: expected %v, got %v", tm,
req.ReadOptions.GetReadTime())
}

return &pb.LookupResponse{
Found: []*pb.EntityResult{
{
Entity: e,
Version: 1,
},
},
}, nil
},
}

client := &Client{
client: fakeClient,
readSettings: &readSettings{},
}

ctx := context.Background()
client.WithReadOptions(ReadTime(tm))
dst := &ent{}
err := client.Get(ctx, k, dst)
if err != nil {
t.Fatalf("Get() with ReadTime failed: %v\n", err)
}
}

func TestGetMultiWithReadTime(t *testing.T) {
type ent struct {
A int
}

tm := time.Now()
k := []*Key{
NameKey("testKind", "testReadTime", nil),
NameKey("testKind", "testReadTime2", nil),
}

e := &pb.Entity{
Key: keyToProto(k[0]),
Properties: map[string]*pb.Value{
"A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}},
},
}
e2 := &pb.Entity{
Key: keyToProto(k[1]),
Properties: map[string]*pb.Value{
"A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}},
},
}

fakeClient := &fakeDatastoreClient{
lookup: func(req *pb.LookupRequest) (*pb.LookupResponse, error) {

if !req.ReadOptions.GetReadTime().AsTime().Equal(tm) {
return nil, fmt.Errorf("read time mismatch: expected %v, got %v", tm,
req.ReadOptions.GetReadTime())
}

return &pb.LookupResponse{
Found: []*pb.EntityResult{
{
Entity: e,
Version: 1,
}, {
Entity: e2,
Version: 1,
},
},
}, nil
},
}

client := &Client{
client: fakeClient,
readSettings: &readSettings{},
}

ctx := context.Background()
client.WithReadOptions(ReadTime(tm))
dst := make([]*ent, len(k))
err := client.GetMulti(ctx, k, dst)
if err != nil {
t.Fatalf("Get() with ReadTime failed: %v\n", err)
}
}

func TestNoIndexOnSliceProperties(t *testing.T) {
// Check that ExcludeFromIndexes is set on the inner elements,
// rather than the top-level ArrayValue value.
Expand Down Expand Up @@ -500,7 +606,8 @@ func TestDeferred(t *testing.T) {
},
}
client := &Client{
client: fakeClient,
client: fakeClient,
readSettings: &readSettings{},
}

ctx := context.Background()
Expand Down Expand Up @@ -620,15 +727,15 @@ func TestDeferredMissing(t *testing.T) {
}

func TestGetWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Get(context.Background(), nil, []Property{})
if err != ErrInvalidKey {
t.Fatalf("want ErrInvalidKey, got %v", err)
}
}

func TestGetMultiWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
dest := make([]PropertyList, 1)
err := client.GetMulti(context.Background(), []*Key{nil}, dest)
if me, ok := err.(MultiError); !ok {
Expand All @@ -639,15 +746,15 @@ func TestGetMultiWithNilKey(t *testing.T) {
}

func TestGetWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Get(context.Background(), &Key{Kind: "testKind"}, []Property{})
if err == nil {
t.Fatalf("want err, got nil")
}
}

func TestGetMultiWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
dest := make([]PropertyList, 1)
err := client.GetMulti(context.Background(), []*Key{{Kind: "testKind"}}, dest)
if me, ok := err.(MultiError); !ok {
Expand All @@ -658,15 +765,15 @@ func TestGetMultiWithIncompleteKey(t *testing.T) {
}

func TestDeleteWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Delete(context.Background(), nil)
if err != ErrInvalidKey {
t.Fatalf("want ErrInvalidKey, got %v", err)
}
}

func TestDeleteMultiWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.DeleteMulti(context.Background(), []*Key{nil})
if me, ok := err.(MultiError); !ok {
t.Fatalf("want MultiError, got %v", err)
Expand All @@ -676,15 +783,15 @@ func TestDeleteMultiWithNilKey(t *testing.T) {
}

func TestDeleteWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Delete(context.Background(), &Key{Kind: "testKind"})
if err == nil {
t.Fatalf("want err, got nil")
}
}

func TestDeleteMultiWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.DeleteMulti(context.Background(), []*Key{{Kind: "testKind"}})
if me, ok := err.(MultiError); !ok {
t.Fatalf("want MultiError, got %v", err)
Expand Down
31 changes: 30 additions & 1 deletion datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,34 @@ func TestIntegration_Basics(t *testing.T) {
}
}

func TestIntegration_GetWithReadTime(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
defer client.Close()

type X struct {
I int
S string
T time.Time
U interface{}
}

x0 := X{66, "99", timeNow.Truncate(time.Millisecond), "X"}
k, err := client.Put(ctx, IncompleteKey("BasicsX", nil), &x0)
if err != nil {
t.Fatalf("client.Put: %v", err)
}
x1 := X{}
client.WithReadOptions(ReadTime(time.Now()))
err = client.Get(ctx, k, &x1)
if err != nil {
t.Errorf("client.Get: %v", err)
}

// Cleanup
_ = client.Delete(ctx, k)
}

func TestIntegration_TopLevelKeyLoaded(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
Expand Down Expand Up @@ -292,7 +320,8 @@ func TestIntegration_GetMulti(t *testing.T) {
if _, err := client.PutMulti(ctx, srcKeys, src); err != nil {
t.Error(err)
}
err := client.GetMulti(ctx, dstKeys, dst)

err := client.WithReadOptions(ReadTime(time.Now())).GetMulti(ctx, dstKeys, dst)
if err == nil {
t.Errorf("client.GetMulti got %v, expected error", err)
}
Expand Down
Loading

0 comments on commit 9240741

Please sign in to comment.