forked from nautilus/gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.go
223 lines (193 loc) · 7.32 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package gateway
import (
"errors"
"sync"
"sync/atomic"
"time"
"crypto/sha256"
"encoding/hex"
)
// In general, "query persistence" is a term for a family of optimizations that involve
// storing some kind of representation of the queries that the client will send. For
// nautilus, this allows for the pre-computation of query plans and can drastically speed
// up response times.
//
// There are a few different strategies when it comes to timing the computation of these
// plans. Each strategy has its own trade-offs and should be carefully considered
//
// Automatic Persisted Queries:
// - client asks for the query associated with a particular hash
// - if the server knows that hash, execute the query plan. if not, return with a known value
// - if the client sees the known value, resend the query with the full query body
// - the server will then calculate the plan and save it for later use
// - if the client sends a known hash along with the query body, the query body is ignored
//
// pros/cons:
// - no need for a build step
// - the client can send any queries they want
//
// StaticPersistedQueries (not implemented here):
// - as part of a build step, the gateway is given the list of queries and associated
// hashes
// - the client only sends the hash with queries
// - if the server recognizes the hash, execute the query. Otherwise, return with en error
//
// pros/cons:
// - need for a separate build step that prepares the queries and shares it with the server
// - tighter control on operations. The client can only send queries that are approved (pre-computed)
// MessageMissingCachedQuery is the string that the server sends when the user assumes that the server knows about
// a caches query plan
const MessageMissingCachedQuery = "PersistedQueryNotFound"
// QueryPlanCache decides when to compute a plan
type QueryPlanCache interface {
Retrieve(ctx *PlanningContext, hash *string, planner QueryPlanner) (QueryPlanList, error)
}
// WithNoQueryPlanCache is the default option and disables any persisted query behavior
func WithNoQueryPlanCache() Option {
return WithQueryPlanCache(&NoQueryPlanCache{})
}
// NoQueryPlanCache will always compute the plan for a query, regardless of the value passed as `hash`
type NoQueryPlanCache struct{}
// Retrieve just computes the query plan
func (p *NoQueryPlanCache) Retrieve(ctx *PlanningContext, _ *string, planner QueryPlanner) (QueryPlanList, error) {
return planner.Plan(ctx)
}
// WithQueryPlanCache sets the query plan cache that the gateway will use
func WithQueryPlanCache(p QueryPlanCache) Option {
return func(g *Gateway) {
g.queryPlanCache = p
}
}
// WithAutomaticQueryPlanCache enables the "automatic persisted query" technique
func WithAutomaticQueryPlanCache() Option {
return WithQueryPlanCache(NewAutomaticQueryPlanCache())
}
type queryPlanCacheItem struct {
LastUsed atomic.Value // time.Time
Value QueryPlanList
}
// AutomaticQueryPlanCache is a QueryPlanCache that will use the hash if it points to a known query plan,
// otherwise it will compute the plan and save it for later, to be referenced by the designated hash.
type AutomaticQueryPlanCache struct {
cache *sync.Map // map[string]*queryPlanCacheItem
ttl time.Duration
// the automatic query plan cache needs to clear itself of query plans that have been used
// recently. This coordination requires a channel over which events can be trigger whenever
// a query is fired, triggering a check to clean up other queries.
retrievedPlan chan bool
// a boolean to track if there is a timer that needs to be reset
resetTimer bool
// a mutex on the timer bool
timeMutex sync.Mutex
}
// WithCacheTTL updates and returns the cache with the new cache lifetime. Queries that haven't been
// used in that long are cleaned up on the next query.
func (c *AutomaticQueryPlanCache) WithCacheTTL(duration time.Duration) *AutomaticQueryPlanCache {
return &AutomaticQueryPlanCache{
cache: c.cache,
ttl: duration,
retrievedPlan: c.retrievedPlan,
resetTimer: c.resetTimer,
}
}
// NewAutomaticQueryPlanCache returns a fresh instance of
func NewAutomaticQueryPlanCache() *AutomaticQueryPlanCache {
return &AutomaticQueryPlanCache{
cache: new(sync.Map),
// default cache lifetime of 3 days
ttl: 10 * 24 * time.Hour,
retrievedPlan: make(chan bool),
resetTimer: false,
}
}
// Retrieve follows the "automatic query persistence" technique. If the hash is known, it will use the referenced query plan.
// If the hash is not know but the query is provided, it will compute the plan, return it, and save it for later use.
// If the hash is not known and the query is not provided, it will return with an error prompting the client to provide the hash and query
func (c *AutomaticQueryPlanCache) Retrieve(ctx *PlanningContext, hash *string, planner QueryPlanner) (QueryPlanList, error) {
// when we're done with retrieving the value we have to clear the cache
defer func() {
// spawn a goroutine that might be responsible for clearing the cache
go func() {
// check if there is a timer to reset
c.timeMutex.Lock()
resetTimer := c.resetTimer
c.timeMutex.Unlock()
// if there is already a goroutine that's waiting to clean things up
if resetTimer {
// just reset their time
c.retrievedPlan <- true
// and we're done
return
}
c.timeMutex.Lock()
c.resetTimer = true
c.timeMutex.Unlock()
// otherwise this is the goroutine responsible for cleaning up the cache
timer := time.NewTimer(c.ttl)
// we will have to consume more than one input
TRUE_LOOP:
for {
select {
// if another plan was retrieved
case <-c.retrievedPlan:
// reset the time
timer.Reset(c.ttl)
// if the timer dinged
case <-timer.C:
// there is no longer a timer to reset
c.timeMutex.Lock()
c.resetTimer = false
c.timeMutex.Unlock()
// loop over every time in the cache
c.cache.Range(func(key, value interface{}) bool {
key, cacheItem := key.(string), value.(*queryPlanCacheItem)
// if the cached query hasn't been used recently enough
lastUsed := cacheItem.LastUsed.Load().(time.Time)
if lastUsed.Before(time.Now().Add(-c.ttl)) {
// delete it from the cache
c.cache.Delete(key)
}
return true
})
// stop consuming
break TRUE_LOOP
}
}
}()
}()
// if we have a cached value for the hash
if value, hasCachedValue := c.cache.Load(*hash); hasCachedValue {
cached := value.(*queryPlanCacheItem)
// update the last used
cached.LastUsed.Store(time.Now())
// return it
return cached.Value, nil
}
// we dont have a cached value
// if we were not given a query string
if ctx.Query == "" {
// return an error with the magic string
return nil, errors.New(MessageMissingCachedQuery)
}
// compute the plan
plan, err := planner.Plan(ctx)
if err != nil {
return nil, err
}
// if there is no hash
if *hash == "" {
hashString := sha256.Sum256([]byte(ctx.Query))
// generate a hash that will identify the query for later use
*hash = hex.EncodeToString(hashString[:])
}
// save it for later
cacheItem := &queryPlanCacheItem{
Value: plan,
}
cacheItem.LastUsed.Store(time.Now())
if actual, exists := c.cache.LoadOrStore(*hash, cacheItem); exists {
actual.(*queryPlanCacheItem).LastUsed.Store(cacheItem.LastUsed.Load())
}
// we're done
return plan, nil
}