Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When executing a query with matrix items, if non key-column quals are used which filter the matrix item list, they must be included in the cache key. Closes #402 #403

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,13 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq
executeSpan.End()
}()

log.Printf("[TRACE] GetMatrixItem")

// get the matrix item
queryData, err := newQueryData(connectionCallId, p, queryContext, table, connectionData, executeData, outputChan)
if err != nil {
return err
}

// get the matrix item
log.Printf("[TRACE] GetMatrixItem")
var matrixItem []map[string]interface{}
if table.GetMatrixItem != nil {
matrixItem = table.GetMatrixItem(ctx, connectionData.Connection)
Expand All @@ -389,7 +388,7 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq
limit := queryContext.GetLimit()

// convert qual map to type used by cache
cacheQualMap := queryData.Quals.ToProtoQualMap()
cacheQualMap := queryData.getCacheQualMap()
// build cache request
cacheRequest := &query_cache.CacheRequest{
Table: table.Name,
Expand Down
20 changes: 20 additions & 0 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type QueryData struct {
// when executing parent child list calls, we cache the parent list result in the query data passed to the child list call
parentItem interface{}
filteredMatrix []map[string]interface{}
// column quals which were used to filter the matrix
filteredMatrixColumns []string

// ttl for the execute call
cacheTtl int64
Expand Down Expand Up @@ -313,6 +315,10 @@ func (d *QueryData) filterMatrixItems() {
// if there IS a single equals qual which DOES NOT match this matrix item, exclude the matrix item
if matrixQuals.SingleEqualsQual() {
includeMatrixItem = d.shouldIncludeMatrixItem(matrixQuals, val)
// store this column - we will need this when building a cache key
if !includeMatrixItem {
d.filteredMatrixColumns = append(d.filteredMatrixColumns, col)
}
}
} else {
log.Printf("[TRACE] quals found for matrix column: %s", col)
Expand Down Expand Up @@ -676,3 +682,17 @@ func (d *QueryData) waitForRowsToComplete(rowWg *sync.WaitGroup, rowChan chan *p
logging.DisplayProfileData(10 * time.Millisecond)
close(rowChan)
}

// build a map of all quals to include in the cache key
// this will include all key column quals, and also any quals which were used to filter the matrix items
func (d *QueryData) getCacheQualMap() map[string]*proto.Quals {
res := d.Quals.ToProtoQualMap()
// now add in any additional (non-keycolumn) quals which were used to folter the matrix
for _, col := range d.filteredMatrixColumns {
if _, ok := res[col]; !ok {
log.Printf("[TRACE] getCacheQualMap - adding non-key column qual %s as it was used to filter the matrix items", col)
res[col] = d.QueryContext.UnsafeQuals[col]
}
}
return res
}
34 changes: 5 additions & 29 deletions plugin/table_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra

rd := newRowData(queryData, nil)

if len(queryData.Matrix) == 0 {
// if a matrix is defined, run listForEach
if len(queryData.Matrix) > 0 {
log.Printf("[TRACE] doList: matrix len %d - calling listForEach", len(queryData.Matrix))
t.listForEach(ctx, queryData, listCall)
} else {
log.Printf("[TRACE] doList: no matrix item")

// we cannot retry errors in the list hydrate function after streaming has started
Expand All @@ -483,8 +487,6 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra
log.Printf("[WARN] doList callHydrateWithRetries (%s) returned err %s", queryData.connectionCallId, err.Error())
queryData.streamError(err)
}
} else {
t.listForEach(ctx, queryData, listCall)
}
}

Expand All @@ -500,14 +502,6 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall
// NOTE - we use the filtered matrix - which means we may not actually run any hydrate calls
// if the quals have filtered out all matrix items (e.g. select where region = 'invalid')
for _, matrixItem := range queryData.filteredMatrix {

// check whether there is a single equals qual for each matrix item property and if so, check whether
// the matrix item property values satisfy the conditions
if !t.matrixItemMeetsQuals(matrixItem, queryData) {
log.Printf("[INFO] listForEach: matrix item item does not meet quals, %v\n", matrixItem)
continue
}

// create a context with the matrixItem
fetchContext := context.WithValue(ctx, context_key.MatrixItem, matrixItem)
wg.Add(1)
Expand Down Expand Up @@ -538,21 +532,3 @@ func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall
}
wg.Wait()
}

func (t *Table) matrixItemMeetsQuals(matrixItem map[string]interface{}, queryData *QueryData) bool {
// for the purposes of optimisation , assume matrix item properties correspond to column names
// if this is NOT the case, it will not fail, but this optimisation will not do anything
for columnName, metadataValue := range matrixItem {
// is there a single equals qual for this column
if qualValue, ok := queryData.Quals[columnName]; ok {
if qualValue.SingleEqualsQual() {
// get the underlying qual value
requiredValue := grpc.GetQualValue(qualValue.Quals[0].Value)
if requiredValue != metadataValue {
return false
}
}
}
}
return true
}