Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastore): adds snapshot reads #6755

Merged
merged 35 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0c11f2f
feat(datastore): adds snapshot reads
telpirion Sep 27, 2022
36d2fd0
add unit test
telpirion Sep 27, 2022
7df213b
Merge branch 'main' into datastore-snapshot
telpirion Sep 28, 2022
5c4afa0
per linter
telpirion Sep 28, 2022
90d43a6
Merge branch 'main' into datastore-snapshot
telpirion Sep 28, 2022
f882e68
fixed unit test
telpirion Sep 28, 2022
417b9eb
Merge branch 'main' into datastore-snapshot
telpirion Sep 28, 2022
40df64d
Merge branch 'main' into datastore-snapshot
telpirion Sep 29, 2022
92d9df5
Merge branch 'main' into datastore-snapshot
telpirion Oct 7, 2022
a7fdf85
per reviewer
telpirion Oct 7, 2022
3ae157f
Merge branch 'main' into datastore-snapshot
telpirion Oct 7, 2022
5947de5
per reviewer
telpirion Oct 7, 2022
de1f888
iter
telpirion Oct 7, 2022
80e5b8d
unit tests
telpirion Oct 10, 2022
1cc3376
unit test
telpirion Oct 10, 2022
97d1235
Merge branch 'main' into datastore-snapshot
telpirion Oct 10, 2022
c8563aa
Merge branch 'main' into datastore-snapshot
telpirion Oct 10, 2022
28e57a0
fix test
telpirion Oct 10, 2022
464dfb7
Merge branch 'main' into datastore-snapshot
telpirion Oct 11, 2022
8d9625b
Merge branch 'main' into datastore-snapshot
telpirion Oct 12, 2022
ba0ab74
per reviewer
telpirion Oct 12, 2022
06ebe59
Merge branch 'main' into datastore-snapshot
telpirion Oct 13, 2022
6279db0
Merge branch 'main' into datastore-snapshot
telpirion Oct 17, 2022
22c400a
Merge branch 'main' into datastore-snapshot
telpirion Oct 18, 2022
4a8a9b2
per reviewer
telpirion Oct 18, 2022
71836ab
per reviewer
telpirion Oct 18, 2022
e704e9a
Merge branch 'main' into datastore-snapshot
telpirion Oct 18, 2022
cff5c69
Merge branch 'main' into datastore-snapshot
telpirion Oct 18, 2022
2df5eee
Merge branch 'main' into datastore-snapshot
telpirion Oct 19, 2022
33d3aa1
Merge branch 'main' into datastore-snapshot
telpirion Oct 20, 2022
db71173
fix
telpirion Oct 20, 2022
189d957
Merge branch 'main' into datastore-snapshot
telpirion Oct 24, 2022
ec3e532
appease the tests
telpirion Oct 24, 2022
9fbc168
Merge branch 'main' into datastore-snapshot
telpirion Oct 26, 2022
590fba5
Merge branch 'main' into datastore-snapshot
gcf-merge-on-green[bot] Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"log"
"os"
"reflect"
"time"

"cloud.google.com/go/internal/trace"
"google.golang.org/api/option"
"google.golang.org/api/transport"
gtransport "google.golang.org/api/transport/grpc"
pb "google.golang.org/genproto/googleapis/datastore/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand All @@ -53,9 +55,10 @@ const resourcePrefixHeader = "google-cloud-resource-prefix"

// Client is a client for reading and writing data in a datastore dataset.
type Client struct {
connPool gtransport.ConnPool
client pb.DatastoreClient
dataset string // Called dataset by the datastore API, synonym for project ID.
connPool gtransport.ConnPool
client pb.DatastoreClient
dataset string // Called dataset by the datastore API, synonym for project ID.
readSettings *readSettings
telpirion marked this conversation as resolved.
Show resolved Hide resolved
}

// NewClient creates a new Client for a given dataset. If the project ID is
Expand Down Expand Up @@ -118,9 +121,10 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
return nil, fmt.Errorf("dialing: %w", err)
}
return &Client{
connPool: connPool,
client: newDatastoreClient(connPool, projectID),
dataset: projectID,
connPool: connPool,
client: newDatastoreClient(connPool, projectID),
dataset: projectID,
readSettings: &readSettings{},
}, nil
}

Expand Down Expand Up @@ -348,7 +352,17 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error)
if dst == nil { // get catches nil interfaces; we need to catch nil ptr here
return ErrInvalidEntityType
}
err = c.get(ctx, []*Key{key}, []interface{}{dst}, nil)

var opts *pb.ReadOptions
if !c.readSettings.readTime.IsZero() {
opts = &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
ReadTime: timestamppb.New(c.readSettings.readTime),
},
}
}

err = c.get(ctx, []*Key{key}, []interface{}{dst}, opts)
if me, ok := err.(MultiError); ok {
return me[0]
}
Expand All @@ -371,7 +385,16 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.GetMulti")
defer func() { trace.EndSpan(ctx, err) }()

return c.get(ctx, keys, dst, nil)
var opts *pb.ReadOptions
if c.readSettings != nil && !c.readSettings.readTime.IsZero() {
opts = &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadTime{
ReadTime: timestamppb.New(c.readSettings.readTime),
},
}
}

return c.get(ctx, keys, dst, opts)
}

func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb.ReadOptions) error {
Expand Down Expand Up @@ -679,3 +702,35 @@ func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) (ret []*Key, err
}
return ret, nil
}

// ReadTime specifies a snapshot (time) of the database to read.
func ReadTime(t time.Time) ReadOption {
return docReadTime(t)
}

type docReadTime time.Time

func (drt docReadTime) apply(rs *readSettings) {
rs.readTime = time.Time(drt)
}

// ReadOption provides specific instructions for how to access documents in the database.
// Currently, only ReadTime is supported.
type ReadOption interface {
apply(*readSettings)
}

type readSettings struct {
readTime time.Time
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
// The client uses this value for subsequent reads, unless additional ReadOptions
// are provided.
func (c *Client) WithReadOptions(ro ...ReadOption) *Client {
for _, r := range ro {
r.apply(c.readSettings)
}
return c
}
125 changes: 116 additions & 9 deletions datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package datastore
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -292,6 +294,110 @@ func TestPutMultiTypes(t *testing.T) {
}
}

func TestGetWithReadTime(t *testing.T) {
type ent struct {
A int
}
tm := time.Now()
k := NameKey("testKind", "testReadTime", nil)
e := &pb.Entity{
Key: keyToProto(k),
Properties: map[string]*pb.Value{
"A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}},
},
}
fakeClient := &fakeDatastoreClient{
lookup: func(req *pb.LookupRequest) (*pb.LookupResponse, error) {
if !req.ReadOptions.GetReadTime().AsTime().Equal(tm) {
return nil, fmt.Errorf("read time mismatch: expected %v, got %v", tm,
req.ReadOptions.GetReadTime())
}

return &pb.LookupResponse{
telpirion marked this conversation as resolved.
Show resolved Hide resolved
Found: []*pb.EntityResult{
{
Entity: e,
Version: 1,
},
},
}, nil
},
}

client := &Client{
client: fakeClient,
readSettings: &readSettings{},
}

ctx := context.Background()
client.WithReadOptions(ReadTime(tm))
dst := &ent{}
err := client.Get(ctx, k, dst)
if err != nil {
t.Fatalf("Get() with ReadTime failed: %v\n", err)
}
}

func TestGetMultiWithReadTime(t *testing.T) {
type ent struct {
A int
}

tm := time.Now()
k := []*Key{
NameKey("testKind", "testReadTime", nil),
NameKey("testKind", "testReadTime2", nil),
}

e := &pb.Entity{
Key: keyToProto(k[0]),
Properties: map[string]*pb.Value{
"A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}},
},
}
e2 := &pb.Entity{
Key: keyToProto(k[1]),
Properties: map[string]*pb.Value{
"A": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}},
},
}

fakeClient := &fakeDatastoreClient{
lookup: func(req *pb.LookupRequest) (*pb.LookupResponse, error) {

if !req.ReadOptions.GetReadTime().AsTime().Equal(tm) {
return nil, fmt.Errorf("read time mismatch: expected %v, got %v", tm,
req.ReadOptions.GetReadTime())
}

return &pb.LookupResponse{
Found: []*pb.EntityResult{
{
Entity: e,
Version: 1,
}, {
Entity: e2,
Version: 1,
},
},
}, nil
},
}

client := &Client{
client: fakeClient,
readSettings: &readSettings{},
}

ctx := context.Background()
client.WithReadOptions(ReadTime(tm))
dst := make([]*ent, len(k))
err := client.GetMulti(ctx, k, dst)
if err != nil {
t.Fatalf("Get() with ReadTime failed: %v\n", err)
}
}

func TestNoIndexOnSliceProperties(t *testing.T) {
// Check that ExcludeFromIndexes is set on the inner elements,
// rather than the top-level ArrayValue value.
Expand Down Expand Up @@ -500,7 +606,8 @@ func TestDeferred(t *testing.T) {
},
}
client := &Client{
client: fakeClient,
client: fakeClient,
readSettings: &readSettings{},
}

ctx := context.Background()
Expand Down Expand Up @@ -620,15 +727,15 @@ func TestDeferredMissing(t *testing.T) {
}

func TestGetWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
telpirion marked this conversation as resolved.
Show resolved Hide resolved
err := client.Get(context.Background(), nil, []Property{})
if err != ErrInvalidKey {
t.Fatalf("want ErrInvalidKey, got %v", err)
}
}

func TestGetMultiWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
dest := make([]PropertyList, 1)
err := client.GetMulti(context.Background(), []*Key{nil}, dest)
if me, ok := err.(MultiError); !ok {
Expand All @@ -639,15 +746,15 @@ func TestGetMultiWithNilKey(t *testing.T) {
}

func TestGetWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Get(context.Background(), &Key{Kind: "testKind"}, []Property{})
if err == nil {
t.Fatalf("want err, got nil")
}
}

func TestGetMultiWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
dest := make([]PropertyList, 1)
err := client.GetMulti(context.Background(), []*Key{{Kind: "testKind"}}, dest)
if me, ok := err.(MultiError); !ok {
Expand All @@ -658,15 +765,15 @@ func TestGetMultiWithIncompleteKey(t *testing.T) {
}

func TestDeleteWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Delete(context.Background(), nil)
if err != ErrInvalidKey {
t.Fatalf("want ErrInvalidKey, got %v", err)
}
}

func TestDeleteMultiWithNilKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.DeleteMulti(context.Background(), []*Key{nil})
if me, ok := err.(MultiError); !ok {
t.Fatalf("want MultiError, got %v", err)
Expand All @@ -676,15 +783,15 @@ func TestDeleteMultiWithNilKey(t *testing.T) {
}

func TestDeleteWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.Delete(context.Background(), &Key{Kind: "testKind"})
if err == nil {
t.Fatalf("want err, got nil")
}
}

func TestDeleteMultiWithIncompleteKey(t *testing.T) {
client := &Client{}
client := &Client{readSettings: &readSettings{}}
err := client.DeleteMulti(context.Background(), []*Key{{Kind: "testKind"}})
if me, ok := err.(MultiError); !ok {
t.Fatalf("want MultiError, got %v", err)
Expand Down
31 changes: 30 additions & 1 deletion datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,34 @@ func TestIntegration_Basics(t *testing.T) {
}
}

func TestIntegration_GetWithReadTime(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
defer client.Close()

type X struct {
I int
S string
T time.Time
U interface{}
}

x0 := X{66, "99", timeNow.Truncate(time.Millisecond), "X"}
k, err := client.Put(ctx, IncompleteKey("BasicsX", nil), &x0)
if err != nil {
t.Fatalf("client.Put: %v", err)
}
x1 := X{}
client.WithReadOptions(ReadTime(time.Now()))
err = client.Get(ctx, k, &x1)
if err != nil {
t.Errorf("client.Get: %v", err)
}

// Cleanup
_ = client.Delete(ctx, k)
}

func TestIntegration_TopLevelKeyLoaded(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
Expand Down Expand Up @@ -292,7 +320,8 @@ func TestIntegration_GetMulti(t *testing.T) {
if _, err := client.PutMulti(ctx, srcKeys, src); err != nil {
t.Error(err)
}
err := client.GetMulti(ctx, dstKeys, dst)

err := client.WithReadOptions(ReadTime(time.Now())).GetMulti(ctx, dstKeys, dst)
if err == nil {
t.Errorf("client.GetMulti got %v, expected error", err)
}
Expand Down
Loading