Skip to content

Commit

Permalink
feat: initial grpc blocking queries
Browse files Browse the repository at this point in the history
  • Loading branch information
DanStough committed May 23, 2023
1 parent d34bde0 commit 11b1dd2
Show file tree
Hide file tree
Showing 40 changed files with 1,365 additions and 1,148 deletions.
208 changes: 208 additions & 0 deletions agent/blockingquery/blockingquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package blockingquery

import (
"context"
"errors"
"fmt"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/lib"
)

// Sentinel errors that must be used with blockingQuery
var (
ErrNotFound = fmt.Errorf("no data found for query")
ErrNotChanged = fmt.Errorf("data did not change for query")
)

// QueryFn is used to perform a query operation. See Server.blockingQuery for
// the requirements of this function.
type QueryFn func(memdb.WatchSet, *state.Store) error

// RequestOptions are options used by Server.blockingQuery to modify the
// behaviour of the query operation, or to populate response metadata.
type RequestOptions interface {
GetToken() string
GetMinQueryIndex() uint64
GetMaxQueryTime() (time.Duration, error)
GetRequireConsistent() bool
}

// ResponseMeta is an interface used to populate the response struct
// with metadata about the query and the state of the server.
type ResponseMeta interface {
SetLastContact(time.Duration)
SetKnownLeader(bool)
GetIndex() uint64
SetIndex(uint64)
SetResultsFilteredByACLs(bool)
}

// FSMServer is interface into the stateful components of a Consul server, such
// as memdb or raft leadership.
type FSMServer interface {
ConsistentRead() error
DecrementBlockingQueries() uint64
GetShutdownChannel() chan struct{}
GetState() *state.Store
IncrementBlockingQueries() uint64
RPCQueryTimeout(time.Duration) time.Duration
SetQueryMeta(ResponseMeta, string)
}

// Query performs a blocking query if opts.GetMinQueryIndex is
// greater than 0, otherwise performs a non-blocking query. Blocking queries will
// block until responseMeta.Index is greater than opts.GetMinQueryIndex,
// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately
// after performing the query.
//
// If opts.GetRequireConsistent is true, blockingQuery will first verify it is
// still the cluster leader before performing the query.
//
// The query function is expected to be a closure that has access to responseMeta
// so that it can set the Index. The actual result of the query is opaque to blockingQuery.
//
// The query function can return ErrNotFound, which is a sentinel error. Returning
// ErrNotFound indicates that the query found no results, which allows
// blockingQuery to keep blocking until the query returns a non-nil error.
// The query function must take care to set the actual result of the query to
// nil in these cases, otherwise when blockingQuery times out it may return
// a previous result. ErrNotFound will never be returned to the caller, it is
// converted to nil before returning.
//
// The query function can return ErrNotChanged, which is a sentinel error. This
// can only be returned on calls AFTER the first call, as it would not be
// possible to detect the absence of a change on the first call. Returning
// ErrNotChanged indicates that the query results are identical to the prior
// results which allows blockingQuery to keep blocking until the query returns
// a real changed result.
//
// The query function must take care to ensure the actual result of the query
// is either left unmodified or explicitly left in a good state before
// returning, otherwise when blockingQuery times out it may return an
// incomplete or unexpected result. ErrNotChanged will never be returned to the
// caller, it is converted to nil before returning.
//
// If query function returns any other error, the error is returned to the caller
// immediately.
//
// The query function must follow these rules:
//
// 1. to access data it must use the passed in state.Store.
// 2. it must set the responseMeta.Index to an index greater than
// opts.GetMinQueryIndex if the results return by the query have changed.
// 3. any channels added to the memdb.WatchSet must unblock when the results
// returned by the query have changed.
//
// To ensure optimal performance of the query, the query function should make a
// best-effort attempt to follow these guidelines:
//
// 1. only set responseMeta.Index to an index greater than
// opts.GetMinQueryIndex when the results returned by the query have changed.
// 2. any channels added to the memdb.WatchSet should only unblock when the
// results returned by the query have changed.
func Query(
fsmServer FSMServer,
requestOpts RequestOptions,
responseMeta ResponseMeta,
query QueryFn,
) error {
var ctx context.Context = &lib.StopChannelContext{StopCh: fsmServer.GetShutdownChannel()}

metrics.IncrCounter([]string{"rpc", "query"}, 1)

minQueryIndex := requestOpts.GetMinQueryIndex()
// Perform a non-blocking query
if minQueryIndex == 0 {
if requestOpts.GetRequireConsistent() {
if err := fsmServer.ConsistentRead(); err != nil {
return err
}
}

var ws memdb.WatchSet
err := query(ws, fsmServer.GetState())
fsmServer.SetQueryMeta(responseMeta, requestOpts.GetToken())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotChanged) {
return nil
}
return err
}

maxQueryTimeout, err := requestOpts.GetMaxQueryTime()
if err != nil {
return err
}
timeout := fsmServer.RPCQueryTimeout(maxQueryTimeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

count := fsmServer.IncrementBlockingQueries()
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count))
// decrement the count when the function returns.
defer fsmServer.DecrementBlockingQueries()

var (
notFound bool
ranOnce bool
)

for {
if requestOpts.GetRequireConsistent() {
if err := fsmServer.ConsistentRead(); err != nil {
return err
}
}

// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
store := fsmServer.GetState()

ws := memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(store.AbandonCh())

err := query(ws, store)
fsmServer.SetQueryMeta(responseMeta, requestOpts.GetToken())

switch {
case errors.Is(err, ErrNotFound):
if notFound {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
notFound = true
case errors.Is(err, ErrNotChanged):
if ranOnce {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
case err != nil:
return err
}
ranOnce = true

if responseMeta.GetIndex() > minQueryIndex {
return nil
}

// block until something changes, or the timeout
if err := ws.WatchCtx(ctx); err != nil {
// exit if we've reached the timeout, or other cancellation
return nil
}

// exit if the state store has been abandoned
select {
case <-store.AbandonCh():
return nil
default:
}
}
}
4 changes: 4 additions & 0 deletions agent/blockingquery/blockingquery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package blockingquery

// TODO: move tests from the consul package, rpc_test.go, TestServer_blockingQuery
// here using mock for FSMServer w/ structs.QueryOptions and structs.QueryOptions
54 changes: 36 additions & 18 deletions agent/cache-types/peerings.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,24 @@ import (
"context"
"fmt"
"strconv"
"time"

external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/mitchellh/hashstructure"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/cache"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

// PeeringListName is the recommended name for registration.
const PeeringListName = "peers"

// PeeringListRequest represents the combination of request payload
// and options that would normally be sent over headers.
type PeeringListRequest struct {
Request *pbpeering.PeeringListRequest
structs.QueryOptions
Expand All @@ -32,13 +34,10 @@ func (r *PeeringListRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: "",
MinIndex: 0,
Timeout: 0,
MustRevalidate: false,

// OPTIMIZE(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works.
// Using an exponential backoff when the result hasn't changed may be preferable.
MaxAge: 1 * time.Second,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
}

v, err := hashstructure.Hash([]interface{}{
Expand All @@ -56,7 +55,7 @@ func (r *PeeringListRequest) CacheInfo() cache.RequestInfo {

// Peerings supports fetching the list of peers for a given partition or wildcard-specifier.
type Peerings struct {
RegisterOptionsNoRefresh
RegisterOptionsBlockingRefresh
Client PeeringLister
}

Expand All @@ -67,7 +66,7 @@ type PeeringLister interface {
) (*pbpeering.PeeringListResponse, error)
}

func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
func (t *Peerings) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a PeeringListRequest.
Expand All @@ -79,10 +78,17 @@ func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchRe
"Internal cache failure: request wrong type: %T", req)
}

// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup

// Set the minimum query index to our current index, so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// We allow stale queries here to spread out the RPC load, but peerstream information, including the STATUS,
// will not be returned. Right now this is fine for the watch in proxycfg/mesh_gateway.go,
// but it could be a problem for a future consumer.
reqReal.QueryOptions.SetAllowStale(true)

ctx, err := external.ContextWithQueryOptions(context.Background(), reqReal.QueryOptions)
Expand All @@ -91,7 +97,8 @@ func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchRe
}

// Fetch
reply, err := t.Client.PeeringList(ctx, reqReal.Request)
var header metadata.MD
reply, err := t.Client.PeeringList(ctx, reqReal.Request, grpc.Header(&header))
if err != nil {
// Return an empty result if the error is due to peering being disabled.
// This allows mesh gateways to receive an update and confirm that the watch is set.
Expand All @@ -103,8 +110,19 @@ func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchRe
return result, err
}

// This first case is using the legacy index field
// It should be removed in a future version in favor of the index from QueryMeta
if reply.OBSOLETE_Index != 0 {
result.Index = reply.OBSOLETE_Index
} else {
meta, err := external.QueryMetaFromGRPCMeta(header)
if err != nil {
return result, fmt.Errorf("could not convert gRPC metadata to query meta: %w", err)
}
result.Index = meta.GetIndex()
}

result.Value = reply
result.Index = reply.Index

return result, nil
}
Loading

0 comments on commit 11b1dd2

Please sign in to comment.