Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): Ignore field mismatch errors #8694

Merged
merged 9 commits into from
Jul 30, 2024
58 changes: 54 additions & 4 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,54 @@ func checkMultiArg(v reflect.Value) (m multiArgType, elemType reflect.Type) {
return multiArgTypeInvalid, nil
}

// IgnoreFieldMismatch allows ignoring ErrFieldMismatch error while
// reading or querying data.
// WARNING: Ignoring ErrFieldMismatch can cause data loss
func (c *Client) IgnoreFieldMismatch() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider expressing this as a client specific ClientIOption. Here is an example from storage:

func WithXMLReads() option.ClientOption {
return &withReadAPI{useJSON: false}
}
type withReadAPI struct {
internaloption.EmbeddableAdapter
useJSON bool
}
func (w *withReadAPI) ApplyStorageOpt(c *storageConfig) {
c.useJSONforReads = w.useJSON
c.readAPIWasSet = true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR

c.readSettings.ignoreFieldMismatchErrors = true
}

func (c *Client) processFieldMismatchError(err error) error {
if c.readSettings == nil || !c.readSettings.ignoreFieldMismatchErrors {
return err
}
return ignoreFieldMismatchErrs(err)
}

func ignoreFieldMismatchErrs(err error) error {
if err == nil {
return err
}

multiErr, isMultiErr := err.(MultiError)
if isMultiErr {
foundErr := false
for i, e := range multiErr {
multiErr[i] = ignoreFieldMismatchErr(e)
if multiErr[i] != nil {
foundErr = true
}
}
if !foundErr {
return nil
}
return multiErr
}

return ignoreFieldMismatchErr(err)
}

func ignoreFieldMismatchErr(err error) error {
if err == nil {
return err
}
_, isFieldMismatchErr := err.(*ErrFieldMismatch)
if isFieldMismatchErr {
return nil
}
return err
}

// Close closes the Client. Call Close to clean up resources when done with the
// Client.
func (c *Client) Close() error {
Expand Down Expand Up @@ -395,9 +443,9 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error)

err = c.get(ctx, []*Key{key}, []interface{}{dst}, opts)
if me, ok := err.(MultiError); ok {
return me[0]
return c.processFieldMismatchError(me[0])
}
return err
return c.processFieldMismatchError(err)
}

// GetMulti is a batch version of Get.
Expand Down Expand Up @@ -426,7 +474,8 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er
}
}

return c.get(ctx, keys, dst, opts)
getErr := c.get(ctx, keys, dst, opts)
return c.processFieldMismatchError(getErr)
}

func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb.ReadOptions) error {
Expand Down Expand Up @@ -814,7 +863,8 @@ type ReadOption interface {
}

type readSettings struct {
readTime time.Time
readTime time.Time
ignoreFieldMismatchErrors bool
}

// WithReadOptions specifies constraints for accessing documents from the database,
Expand Down
110 changes: 110 additions & 0 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,116 @@ func TestIntegration_Basics(t *testing.T) {
}
}

type OldX struct {
I int
J int
}
type NewX struct {
I int
}

func TestIntegration_IgnoreFieldMismatch(t *testing.T) {
ctx := context.Background()
client := newTestClient(ctx, t)
client.IgnoreFieldMismatch()
defer client.Close()

// Save entities with an extra field
keysOld := []*Key{
NameKey("X", "x1", nil),
NameKey("X", "x2", nil),
}
entitiesOld := []OldX{
{I: 10, J: 20},
{I: 30, J: 40},
}
_, err := client.PutMulti(ctx, keysOld, entitiesOld)
if err != nil {
t.Fatalf("Failed to save: %v\n", err)
}

// Save entities without extra field
keysNew := []*Key{
NameKey("X", "x3", nil),
}
entitiesNew := []NewX{
{I: 50},
}
_, err = client.PutMulti(ctx, keysNew, entitiesNew)
if err != nil {
t.Fatalf("Failed to save: %v\n", err)
}

keys := append(keysOld, keysNew...)
var wants []NewX
for _, oldX := range entitiesOld {
wants = append(wants, []NewX{{I: oldX.I}}...)
}
wants = append(wants, entitiesNew...)

// FieldMismatch ignored in Next
query := NewQuery("X").FilterField("I", ">=", 10)
it := client.Run(ctx, query)
resIndex := 0
for {
var newX NewX
_, err := it.Next(&newX)
if err == iterator.Done {
break
}
if err != nil {
t.Fatalf("Next got: %v, want: nil\n", err)
}
if newX.I != wants[resIndex].I {
t.Fatalf("Next got: %v, want: %v\n", newX.I, wants[resIndex].I)
}
resIndex++
}

// FieldMismatch ignored in Get
var getX NewX
err = client.Get(ctx, keys[0], &getX)
compareIgnoreFieldMismatchResults(t, []NewX{wants[0]}, []NewX{getX}, err, "Get")

// FieldMismatch ignored in GetAll
var getAllX []NewX
_, err = client.GetAll(ctx, query, &getAllX)
compareIgnoreFieldMismatchResults(t, wants, getAllX, err, "GetAll")

// FieldMismatch ignored in GetMulti
getMultiX := make([]NewX, 3)
err = client.GetMulti(ctx, keys, getMultiX)
compareIgnoreFieldMismatchResults(t, wants, getMultiX, err, "GetMulti")

tx, err := client.NewTransaction(ctx)
if err != nil {
t.Fatalf("tx.GetMulti got: %v, want: nil\n", err)
}

// FieldMismatch ignored in tx.Get
var txGetX NewX
err = tx.Get(keys[0], &txGetX)
compareIgnoreFieldMismatchResults(t, []NewX{wants[0]}, []NewX{txGetX}, err, "tx.Get")

// FieldMismatch ignored in tx.GetMulti
txGetMultiX := make([]NewX, 3)
err = tx.GetMulti(keys, txGetMultiX)
compareIgnoreFieldMismatchResults(t, wants, txGetMultiX, err, "tx.GetMulti")

tx.Commit()

}

func compareIgnoreFieldMismatchResults(t *testing.T, wantX []NewX, gotX []NewX, err error, errPrefix string) {
if err != nil {
t.Fatalf("%v got: %v, want: nil\n", errPrefix, err)
}
for resIndex := 0; resIndex < len(wantX); resIndex++ {
if wantX[resIndex].I != gotX[resIndex].I {
t.Fatalf("%v %v: got: %v, want: %v\n", errPrefix, resIndex, wantX[resIndex].I, gotX[resIndex].I)
}
}
}
func TestIntegration_GetWithReadTime(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
client := newTestClient(ctx, t)
Expand Down
4 changes: 2 additions & 2 deletions datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func (c *Client) GetAll(ctx context.Context, q *Query, dst interface{}) (keys []
}
keys = append(keys, k)
}
return keys, errFieldMismatch
return keys, c.processFieldMismatchError(errFieldMismatch)
}

// Run runs the given query in the given context.
Expand Down Expand Up @@ -874,7 +874,7 @@ func (t *Iterator) Next(dst interface{}) (k *Key, err error) {
if dst != nil && !t.keysOnly {
err = loadEntityProto(dst, e)
}
return k, err
return k, t.client.processFieldMismatchError(err)
}

func (t *Iterator) next() (*Key, *pb.Entity, error) {
Expand Down
7 changes: 4 additions & 3 deletions datastore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ func (t *Transaction) Get(key *Key, dst interface{}) (err error) {
}
err = t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts)
if me, ok := err.(MultiError); ok {
return me[0]
return t.client.processFieldMismatchError(me[0])
}
return err
return t.client.processFieldMismatchError(err)
}

// GetMulti is a batch version of Get.
Expand All @@ -307,7 +307,8 @@ func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
opts := &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
}
return t.client.get(t.ctx, keys, dst, opts)
getErr := t.client.get(t.ctx, keys, dst, opts)
return t.client.processFieldMismatchError(getErr)
}

// Put is the transaction-specific version of the package function Put.
Expand Down
Loading