Skip to content

Commit

Permalink
chore: revert payloads-related changes (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas authored Oct 23, 2024
1 parent 04030c1 commit 7857d78
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 163 deletions.
8 changes: 3 additions & 5 deletions authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ type claims struct {
}

type mercureClaim struct {
Publish []string `json:"publish"`
Subscribe []string `json:"subscribe"`
// Deprecated: use the Payloads field instead
Payload interface{} `json:"payload"`
Payloads map[string]interface{} `json:"payloads"`
Publish []string `json:"publish"`
Subscribe []string `json:"subscribe"`
Payload interface{} `json:"payload"`
}

type role int
Expand Down
30 changes: 1 addition & 29 deletions docs/UPGRADE.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,6 @@
# Upgrade

## 0.20

The `mercure.payload` JWT key has been deprecated. It is now possible to make topic-specific data
available in subscriptions events and through the subscription API.
To make data available in all events and API responses describing subscriptions, use the `*` topic selector.

Before:

```json
{
"mercure": {
"payload": { "foo": "bar" }
}
}
```

After:

```json
{
"mercure": {
"payloads": {
"*": { "foo": "bar" }
}
}
}
```

[Read the updated specification](../spec/mercure.md#payloads) to learn how to leverage this new feature.
## 0.17

The `MERCURE_TRANSPORT_URL` environment variable and the `transport_url` directive have been deprecated.
Use the new `transport` directive instead.
Expand Down
11 changes: 5 additions & 6 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,12 @@ func createAnonymousDummy(options ...Option) *Hub {
}

func createDummyAuthorizedJWT(r role, topics []string) string {
payloads := map[string]interface{}{"*": make(map[string]string)}
payloads["*"].(map[string]string)["foo"] = "bar"

return createDummyAuthorizedJWTWithPayload(r, topics, payloads)
return createDummyAuthorizedJWTWithPayload(r, topics, struct {
Foo string `json:"foo"`
}{Foo: "bar"})
}

func createDummyAuthorizedJWTWithPayload(r role, topics []string, payloads map[string]interface{}) string {
func createDummyAuthorizedJWTWithPayload(r role, topics []string, payload interface{}) string {
token := jwt.New(jwt.SigningMethodHS256)

var key []byte
Expand All @@ -283,7 +282,7 @@ func createDummyAuthorizedJWTWithPayload(r role, topics []string, payloads map[s
token.Claims = &claims{
Mercure: mercureClaim{
Subscribe: topics,
Payloads: payloads,
Payload: payload,
},
RegisteredClaims: jwt.RegisteredClaims{},
}
Expand Down
34 changes: 7 additions & 27 deletions spec/mercure.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,34 +472,14 @@ Consequently, this private update will be received by this subscriber, while oth
a canonical topic matched by the selector provided in a `topic` query parameter but not matched by
selectors in the `mercure.subscribe` claim will not.

## Payloads

User-defined data can be attached to subscriptions and made available through the subscription API
and in subscription events.
## Payload

The `mercure` claim of the JWS **CAN** also contain user-defined values under the `payload` key.
This JSON document will be attached to the subscription and made available in subscription events.
See (#subscription-events).

The `mercure` claim of the JWS **CAN** contain a JSON object under the `payloads` key.
This JSON document **MUST** have selectors as keys, and user-defined data as values.

The value associated with the first topic selector matching the topic of the subscription
**MUST** be included under the `payload` key in the JSON object describing a subscription in
the subscription API and in subscription events.

Example JWT document containing payloads:

~~~ json
{
"subscribe": ["https://example.com/foo", "https://example.com/bar/baz"]
"payloads": {
"https://example.com/bar/{val}": {"custom": "data only available for subscriptions matching this selector"},
"*": {"data": "available for all other topics"}
}
}
~~~

For instance, payloads can contain the user ID of the subscriber, its username, a list of groups it
belongs to, or its IP address. Storing data in `mercure.payloads` is a convenient way to share data
For instance, `mercure.payload` can contain the user ID of the subscriber, a list of groups it
belongs to, or its IP address. Storing data in `mercure.payload` is a convenient way to share data
related to one subscriber to other subscribers.

# Reconnection, State Reconciliation and Event Sourcing {#reconciliation}
Expand Down Expand Up @@ -595,8 +575,8 @@ least the following properties:
* `topic`: the topic selector used of this subscription
* `subscriber`: the topic identifier of the subscriber. It **SHOULD** be an IRI.
* `active`: `true` when the subscription is active, and `false` when it is terminated
* `payload` (optional): content of the `mercure.payloads` in the subscriber's JWS matching the topic
(see (#authorization))
* `payload` (optional): the content of `mercure.payload` in the subscriber's JWS (see
(#authorization))

The JSON-LD document **MAY** contain other properties.

Expand Down
44 changes: 7 additions & 37 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,47 +203,17 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request) (*Subsc
rc := h.newResponseController(w, s)
rc.flush()

h.normalizeClaims(claims)
h.logNewSubscriber(claims, s)
h.metrics.SubscriberConnected(s)

return s, rc
}

func (h *Hub) logNewSubscriber(claims *claims, s *Subscriber) {
c := h.logger.Check(zap.InfoLevel, "New subscriber")
if c == nil {
return
}

fields := []LogField{zap.Object("subscriber", s)}
if claims != nil && h.logger.Level() == zap.DebugLevel {
if claims.Mercure.Payload != nil && h.opt.isBackwardCompatiblyEnabledWith(8) {
fields = append(
fields,
zap.Reflect("payload", claims.Mercure.Payload),
)
if c := h.logger.Check(zap.InfoLevel, "New subscriber"); c != nil {
fields := []LogField{zap.Object("subscriber", s)}
if claims != nil && h.logger.Level() == zap.DebugLevel {
fields = append(fields, zap.Reflect("payload", claims.Mercure.Payload))
}

fields = append(
fields,
zap.Reflect("payloads", claims.Mercure.Payloads),
)
}

c.Write(fields...)
}

func (h *Hub) normalizeClaims(c *claims) {
if c == nil || c.Mercure.Payload == nil {
return
c.Write(fields...)
}
h.metrics.SubscriberConnected(s)

if h.opt.isBackwardCompatiblyEnabledWith(8) {
h.logger.Info(`Deprecated: the "mercure.payload" JWT claim deprecated since the version 8 of the protocol, use "mercure.payloads" claim with a "*" key instead.`)
} else {
c.Mercure.Payload = nil
}
return s, rc
}

// sendHeaders sends correct HTTP headers to create a keep-alive connection.
Expand Down
18 changes: 9 additions & 9 deletions subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ func TestSubscribe(t *testing.T) {
testSubscribe(t, 3)
}

func testSubscribeLogs(t *testing.T, hub *Hub, payloads map[string]interface{}) {
func testSubscribeLogs(t *testing.T, hub *Hub, payload interface{}) {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest(http.MethodGet, defaultHubURL+"?topic=http://example.com/reviews/{id}", nil).WithContext(ctx)
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWTWithPayload(roleSubscriber, []string{"http://example.com/reviews/22"}, payloads)})
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWTWithPayload(roleSubscriber, []string{"http://example.com/reviews/22"}, payload)})

w := &responseTester{
expectedStatusCode: http.StatusOK,
Expand All @@ -351,18 +351,18 @@ func testSubscribeLogs(t *testing.T, hub *Hub, payloads map[string]interface{})

func TestSubscribeWithLogLevelDebug(t *testing.T) {
core, logs := observer.New(zapcore.DebugLevel)
payloads := map[string]interface{}{
"*": make(map[string]string),
payload := map[string]interface{}{
"bar": "baz",
"foo": "bar",
}

payloads["*"].(map[string]string)["bar"] = "baz"
payloads["*"].(map[string]string)["foo"] = "bar"

testSubscribeLogs(t, createDummy(
WithLogger(zap.New(core)),
), payloads)
), payload)

assert.Equal(t, 1, logs.FilterMessage("New subscriber").FilterFieldKey("payloads").Len())
assert.Equal(t, 1, logs.FilterMessage("New subscriber").FilterField(
zap.Reflect("payload", payload)).Len(),
)
}

func TestSubscribeLogLevelInfo(t *testing.T) {
Expand Down
17 changes: 2 additions & 15 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,8 @@ func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subs
Topic: t,
Active: active,
}
if s.Claims != nil { //nolint:nestif
if s.Claims.Mercure.Payloads == nil {
if s.Claims.Mercure.Payload != nil {
subscription.Payload = s.Claims.Mercure.Payload
}
} else {
for k, v := range s.Claims.Mercure.Payloads {
if !s.topicSelectorStore.match(t, k) {
continue
}
subscription.Payload = v

break
}
}
if s.Claims != nil && s.Claims.Mercure.Payload != nil {
subscription.Payload = s.Claims.Mercure.Payload
}

subscriptions = append(subscriptions, subscription)
Expand Down
35 changes: 0 additions & 35 deletions subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,38 +210,3 @@ func TestSubscriptionHandler(t *testing.T) {
assert.Equal(t, http.StatusNotFound, res.StatusCode)
res.Body.Close()
}

func TestSubscriptionPayload(t *testing.T) {
logger := zap.NewNop()
tss := &TopicSelectorStore{}

for _, selector := range []string{"*", "http://example.com/foo", "http://example.com/{var}"} {
t.Run("selector "+selector, func(t *testing.T) {
hub := createDummy(WithLogger(logger))

s1 := NewSubscriber("", logger, tss)
s1.SetTopics([]string{"http://example.com/foo"}, nil)

s1.Claims = &claims{}
s1.Claims.Mercure.Payloads = map[string]interface{}{}
s1.Claims.Mercure.Payloads[selector] = "foo"
s1.Claims.Mercure.Payloads["http://example.com/bar"] = "bar"

require.NoError(t, hub.transport.AddSubscriber(s1))

req := httptest.NewRequest(http.MethodGet, defaultHubURL+subscriptionsPath, nil)
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(roleSubscriber, []string{"/.well-known/mercure/subscriptions"})})
w := httptest.NewRecorder()
hub.SubscriptionsHandler(w, req)
res := w.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)
res.Body.Close()

var subscriptions subscriptionCollection
json.Unmarshal(w.Body.Bytes(), &subscriptions)

require.Len(t, subscriptions.Subscriptions, 1)
assert.Equal(t, "foo", subscriptions.Subscriptions[0].Payload)
})
}
}

0 comments on commit 7857d78

Please sign in to comment.