-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
resource.go
419 lines (371 loc) · 11.9 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
package res
import (
"net/url"
nats "github.com/nats-io/nats.go"
)
// Resource represents a resource
type Resource interface {
// Service returns the service instance
Service() *Service
// Resource returns the resource name.
ResourceName() string
// ResourceType returns the resource type.
ResourceType() ResourceType
// PathParams returns parameters that are derived from the resource name.
PathParams() map[string]string
// PathParam returns the key placeholder parameter value derived from the resource name.
PathParam(string) string
// Query returns the query part of the resource ID without the question mark separator.
Query() string
// Group which the resource shares worker goroutine with.
// Will be the resource name of no specific group was set.
Group() string
// ParseQuery parses the query and returns the corresponding values.
// It silently discards malformed value pairs.
// To check errors use url.ParseQuery(Query()).
ParseQuery() url.Values
// Value gets the resource value as provided from the Get resource handlers.
// If it fails to get the resource value, or no get handler is
// defined, it returns a nil interface and a *Error type error.
Value() (interface{}, error)
// RequireValue gets the resource value as provided from the Get resource handlers.
// Panics if it fails to get the resource value, or no get handler is defined.
RequireValue() interface{}
// Event sends a custom event on the resource.
// Will panic if the event is one of the pre-defined or reserved events,
// "change", "delete", "add", "remove", "patch", "reaccess", "unsubscribe", or "query".
// For pre-defined events, the matching method, ChangeEvent, AddEvent,
// RemoveEvent, CreateEvent, DeleteEvent, or ReaccessEvent should be used instead.
//
// This is to ensure compliance with the specifications:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#events
Event(event string, payload interface{})
// ChangeEvents sends a change event with properties that has been changed
// and their new values.
// If props is empty, no event is sent.
// Panics if the resource is not a Model.
// The values must be serializable into JSON primitives, resource references,
// or a delete action objects.
// See the protocol specification for more information:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#model-change-event
ChangeEvent(props map[string]interface{})
// AddEvent sends an add event, adding the value at index idx.
// Panics if the resource is not a Collection, or if idx is less than 0.
// The value must be serializable into a JSON primitive or resource reference.
// See the protocol specification for more information:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#collection-add-event
AddEvent(value interface{}, idx int)
// RemoveEvent sends a remove event, removing the value at index idx.
// Panics if the resource is not a Collection, or if idx is less than 0.
// See the protocol specification for more information:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#collection-remove-event
RemoveEvent(idx int)
// ReaccessEvent sends a reaccess event to signal that the resource's access permissions has changed.
// It will invalidate any previous access response sent for the resource.
// See the protocol specification for more information:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#reaccess-event
ReaccessEvent()
// ResetEvent sends a reset event to signal that the resource's data has changed.
// It will invalidate any previous get response sent for the resource.
// See the protocol specification for more information:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#reaccess-event
ResetEvent()
// QueryEvent sends a query event to signal that the query resource's underlying data has been modified.
// See the protocol specification for more information:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#query-event
QueryEvent(func(QueryRequest))
// CreateEvent sends a create event, to signal the resource has been created, with
// value being the resource value.
CreateEvent(value interface{})
// DeleteEvent sends a delete event, to signal the resource has been deleted.
DeleteEvent()
}
// resource is the internal implementation of the Resource interface
type resource struct {
rname string
pathParams map[string]string
query string
group string
h Handler
listeners []func(*Event)
s *Service
}
// Service returns the service instance
func (r *resource) Service() *Service {
return r.s
}
// ResourceName returns the resource name.
func (r *resource) ResourceName() string {
return r.rname
}
// ResourceType returns the resource type.
func (r *resource) ResourceType() ResourceType {
return r.h.Type
}
// PathParams returns parameters that are derived from the resource name.
func (r *resource) PathParams() map[string]string {
return r.pathParams
}
// PathParam returns the parameter derived from the resource name for the key placeholder.
func (r *resource) PathParam(key string) string {
return r.pathParams[key]
}
// Query returns the query part of the resource ID without the question mark separator.
func (r *resource) Query() string {
return r.query
}
// Group returns the group which the resource shares the worker goroutine with.
// Will be the resource name of no specific group was set.
func (r *resource) Group() string {
return r.group
}
// ParseQuery parses the query and returns the corresponding values.
// It silently discards malformed value pairs.
// To check errors use url.ParseQuery.
func (r *resource) ParseQuery() url.Values {
v, _ := url.ParseQuery(r.query)
return v
}
// Value gets the resource value as provided from the Get resource handlers.
// If it fails to get the resource value, or no get handler is
// defined, it returns a nil interface and a *Error type error.
func (r *resource) Value() (interface{}, error) {
gr := &getRequest{resource: r}
gr.executeHandler()
return gr.value, gr.err
}
// RequireValue uses Value to gets the resource value, provided from the Get resource handler.
// It panics if the underlying call to Value return an error.
func (r *resource) RequireValue() interface{} {
i, err := r.Value()
if err != nil {
panic(err)
}
return i
}
// Event sends a custom event on the resource.
// Will panic if the event is one of the pre-defined or reserved events,
// "change", "delete", "add", "remove", "patch", "reaccess", "unsubscribe", or "query".
// For pre-defined events, the matching method, ChangeEvent, AddEvent,
// RemoveEvent, CreateEvent, DeleteEvent, or ReaccessEvent should be used instead.
//
// This is to ensure compliance with the specifications:
// https://github.com/resgateio/resgate/blob/master/docs/res-service-protocol.md#events
func (r *resource) Event(event string, payload interface{}) {
switch event {
case "change":
panic("res: use ChangeEvent to send change events")
case "delete":
panic(`res: "delete" is a reserved event name`)
case "add":
panic("res: use AddEvent to send add events")
case "remove":
panic("res: use RemoveEvent to send remove events")
case "patch":
panic(`res: "patch" is a reserved event name`)
case "reaccess":
panic("res: use ReaccessEvent to send a reaccess event")
case "unsubscribe":
panic(`res: "unsubscribe" is a reserved event name`)
case "query":
panic(`res: "query" is a reserved event name`)
}
if !isValidPart(event) {
panic(`res: invalid event name`)
}
r.s.event("event."+r.rname+"."+event, payload)
if r.listeners != nil {
ev := &Event{
Name: event,
Resource: r,
Payload: payload,
}
for _, cb := range r.listeners {
cb(ev)
}
}
}
// ChangeEvent sends a change event.
// The changed map's keys are the key names for the model properties,
// and the values are the new values.
// If changed is empty, no event is sent.
//
// Only valid for a model resource.
func (r *resource) ChangeEvent(changed map[string]interface{}) {
if r.h.Type == TypeCollection {
panic("res: change event not allowed on Collections")
}
if len(changed) == 0 {
return
}
var rev map[string]interface{}
var err error
if r.h.ApplyChange != nil {
rev, err = r.h.ApplyChange(r, changed)
if err != nil {
panic(err)
}
if rev != nil {
if len(rev) == 0 {
return
}
}
}
r.s.event("event."+r.rname+".change", changeEvent{Values: changed})
if r.listeners != nil {
ev := &Event{
Name: "change",
Resource: r,
NewValues: changed,
OldValues: rev,
}
for _, cb := range r.listeners {
cb(ev)
}
}
}
// AddEvent sends an add event, adding the value v at index idx.
//
// Only valid for a collection resource.
func (r *resource) AddEvent(v interface{}, idx int) {
if r.h.Type == TypeModel {
panic("res: add event not allowed on models")
}
if idx < 0 {
panic("res: add event idx less than zero")
}
if r.h.ApplyAdd != nil {
err := r.h.ApplyAdd(r, v, idx)
if err != nil {
panic(err)
}
}
r.s.event("event."+r.rname+".add", addEvent{Value: v, Idx: idx})
if r.listeners != nil {
ev := &Event{
Name: "add",
Resource: r,
Value: v,
Idx: idx,
}
for _, cb := range r.listeners {
cb(ev)
}
}
}
// RemoveEvent sends an remove event, removing the value at index idx.
//
// Only valid for a collection resource.
func (r *resource) RemoveEvent(idx int) {
if r.h.Type == TypeModel {
panic("res: remove event not allowed on models")
}
if idx < 0 {
panic("res: remove event idx less than zero")
}
var err error
var v interface{}
if r.h.ApplyRemove != nil {
v, err = r.h.ApplyRemove(r, idx)
if err != nil {
panic(err)
}
}
r.s.event("event."+r.rname+".remove", removeEvent{Idx: idx})
if r.listeners != nil {
ev := &Event{
Name: "remove",
Resource: r,
Value: v,
Idx: idx,
}
for _, cb := range r.listeners {
cb(ev)
}
}
}
// ReaccessEvent sends a reaccess event.
func (r *resource) ReaccessEvent() {
r.s.rawEvent("event."+r.rname+".reaccess", nil)
}
// ResetEvent sends a system.reset event for the specific resource.
func (r *resource) ResetEvent() {
r.s.Reset([]string{r.ResourceName()}, nil)
}
// QueryEvent sends a query event on the resource, calling the
// provided callback on any query request.
// The last call to the callback will always be with nil, indicating
// that the query event duration has expired.
func (r *resource) QueryEvent(cb func(QueryRequest)) {
qsubj := nats.NewInbox()
ch := make(chan *nats.Msg, queryEventChannelSize)
sub, err := r.s.nc.ChanSubscribe(qsubj, ch)
if err != nil {
cb(nil)
r.s.errorf("Failed to subscribe to query event: %s", err)
return
}
qe := &queryEvent{
r: *r,
sub: sub,
ch: ch,
cb: cb,
}
r.s.event("event."+r.rname+".query", resQueryEvent{Subject: qsubj})
go qe.startQueryListener()
r.s.queryTQ.Add(qe)
}
// CreateEvent sends a create event for the resource, where data is
// the created resource data.
func (r *resource) CreateEvent(data interface{}) {
if r.h.ApplyCreate != nil {
err := r.h.ApplyCreate(r, data)
if err != nil {
panic(err)
}
}
r.s.rawEvent("event."+r.rname+".create", nil)
if r.listeners != nil {
ev := &Event{
Name: "create",
Resource: r,
Data: data,
}
for _, cb := range r.listeners {
cb(ev)
}
}
}
// DeleteEvent sends a delete event.
func (r *resource) DeleteEvent() {
var data interface{}
var err error
if r.h.ApplyDelete != nil {
data, err = r.h.ApplyDelete(r)
if err != nil {
panic(err)
}
}
r.s.rawEvent("event."+r.rname+".delete", nil)
if r.listeners != nil {
ev := &Event{
Name: "delete",
Resource: r,
Data: data,
}
for _, cb := range r.listeners {
cb(ev)
}
}
}
func isValidPart(p string) bool {
if p == "" {
return false
}
for _, r := range p {
if r < 33 || r > 126 || r == '?' || r == '*' || r == '>' || r == '.' {
return false
}
}
return true
}