Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
```.env
GRAPHQL_ANYCABLE_SUBSCRIPTION_EXPIRATION_SECONDS=604800
GRAPHQL_ANYCABLE_USE_REDIS_OBJECT_ON_CLEANUP=true
GRAPHQL_ANYCABLE_HANDLE_LEGACY_SUBSCRIPTIONS=false
GRAPHQL_ANYCABLE_USE_CLIENT_PROVIDED_UNIQ_ID=false
```

Expand All @@ -150,6 +151,7 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
production:
subscription_expiration_seconds: 300 # 5 minutes
use_redis_object_on_cleanup: false # For restricted redis installations
handle_legacy_subscriptions: false # For seamless upgrade from pre-1.3 versions
use_client_provided_uniq_id: false # To avoid problems with non-uniqueness of Apollo channel identifiers
```

Expand All @@ -174,26 +176,36 @@ As in AnyCable there is no place to store subscription data in-memory, it should
=> 1:myStats:/MyStats/fBDZmJU1UGTorQWvOyUeaHVwUxJ3T9SEqnetj6SKGXc=/0/RBNvo1WzZ4oRRq0W9-hknpT7T8If536DEMBg9hyq_4o=
```

2. Event subscriptions: `graphql-subscriptions:#{event.fingerptint}` set containing identifiers for all subscriptions for given operation with certain context and arguments (serialized in _topic_). Fingerprints are already scoped by topic.
2. Subscription data: `graphql-fingerprint:#{event.fingerptint}` hash contains everything required to evaluate subscription on trigger and create data for client.

```
HGETALL graphql-fingerprint:1:myStats:/MyStats/fBDZmJU1UGTorQWvOyUeaHVwUxJ3T9SEqnetj6SKGXc=/0/RBNvo1WzZ4oRRq0W9-hknpT7T8If536DEMBg9hyq_4o=
=> {
context: '{"user_id":1,"user":{"__gid__":"Z2lkOi8vZWJheS1tYWcyL1VzZXIvMQ"}}',
variables: '{}',
operation_name: 'MyStats'
query_string: 'subscription MyStats { myStatsUpdated { completed total processed __typename } }',
}
```


3. Event subscriptions: `graphql-subscriptions:#{event.fingerptint}` set containing identifiers for all subscriptions for given operation with certain context and arguments (serialized in _topic_). Fingerprints are already scoped by topic.

```
SMEMBERS graphql-subscriptions:1:myStats:/MyStats/fBDZmJU1UGTorQWvOyUeaHVwUxJ3T9SEqnetj6SKGXc=/0/RBNvo1WzZ4oRRq0W9-hknpT7T8If536DEMBg9hyq_4o=
=> 52ee8d65-275e-4d22-94af-313129116388
```

3. Subscription data: `graphql-subscription:#{subscription_id}` hash contains everything required to evaluate subscription on trigger and create data for client.
4. Subscription data: `graphql-subscription:#{subscription_id}` hash contains everything required to evaluate subscription on trigger and create data for client.

```
HGETALL graphql-subscription:52ee8d65-275e-4d22-94af-313129116388
=> {
context: '{"user_id":1,"user":{"__gid__":"Z2lkOi8vZWJheS1tYWcyL1VzZXIvMQ"}}',
variables: '{}',
operation_name: 'MyStats'
query_string: 'subscription MyStats { myStatsUpdated { completed total processed __typename } }',
events: '{"1:myStats:":"1:myStats:/MyStats/fBDZmJU1UGTorQWvOyUeaHVwUxJ3T9SEqnetj6SKGXc=/0/RBNvo1WzZ4oRRq0W9-hknpT7T8If536DEMBg9hyq_4o="}',
}
```

4. Channel subscriptions: `graphql-channel:#{channel_id}` set containing identifiers for subscriptions created in ActionCable channel to delete them on client disconnect.
5. Channel subscriptions: `graphql-channel:#{channel_id}` set containing identifiers for subscriptions created in ActionCable channel to delete them on client disconnect.

```
SMEMBERS graphql-channel:17420c6ed9e
Expand Down
1 change: 1 addition & 0 deletions lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Config < Anyway::Config

attr_config subscription_expiration_seconds: nil
attr_config use_redis_object_on_cleanup: true
attr_config handle_legacy_subscriptions: false
attr_config use_client_provided_uniq_id: true
end
end
Expand Down
63 changes: 36 additions & 27 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

SUBSCRIPTION_PREFIX = "graphql-subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "graphql-fingerprints:" # ZSET: To get fingerprints by topic
FINGERPRINT_PREFIX = "graphql-fingerprint:" # HASH: Stores subscription data: query, context, …
SUBSCRIPTIONS_PREFIX = "graphql-subscriptions:" # SET: To get subscriptions by fingerprint
SUBSCRIPTION_PREFIX = "graphql-subscription:" # HASH: Stores mapping between topics and fingerprint for given subscription, …
CHANNEL_PREFIX = "graphql-channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
Expand All @@ -70,34 +71,21 @@ def initialize(serializer: Serialize, **rest)
# An event was triggered.
# Re-evaluate all subscribed queries and push the data over ActionCable.
def execute_all(event, object)
fingerprints = redis.zrange(FINGERPRINTS_PREFIX + event.topic, 0, -1)
fingerprints = redis.zrange(FINGERPRINTS_PREFIX + event.topic, 0, -1, with_scores: true)
return if fingerprints.empty?

fingerprint_subscription_ids = Hash[fingerprints.zip(
redis.pipelined do |pipeline|
fingerprints.map do |fingerprint|
pipeline.smembers(SUBSCRIPTIONS_PREFIX + fingerprint)
end
end
)]

fingerprint_subscription_ids.each do |fingerprint, subscription_ids|
execute_grouped(fingerprint, subscription_ids, event, object)
fingerprints.each do |fingerprint, _subscription_count|
execute_grouped(fingerprint, event, object)
end

# Call to +trigger+ returns this. Convenient for playing in console
Hash[fingerprint_subscription_ids.map { |k,v| [k, v.size] }]
# Call to +trigger+ will return fingerprints with number of subscriptions. Convenient for playing in console
fingerprints
end

# The fingerprint has told us that this response should be shared by all subscribers,
# so just run it once, then deliver the result to every subscriber
def execute_grouped(fingerprint, subscription_ids, event, object)
return if subscription_ids.empty?

subscription_id = subscription_ids.find { |sid| redis.exists?(SUBSCRIPTION_PREFIX + sid) }
return unless subscription_id # All subscriptions has expired but haven't cleaned up yet

result = execute_update(subscription_id, event, object)
def execute_grouped(fingerprint, event, object)
result = execute_update(fingerprint, event, object)
return unless result

# Having calculated the result _once_, send the same payload to all subscribers
Expand Down Expand Up @@ -131,25 +119,30 @@ def write_subscription(query, events)
# Store subscription_id in the channel state to cleanup on disconnect
write_subscription_id(channel, channel_uniq_id)


events.each do |event|
channel.stream_from(SUBSCRIPTIONS_PREFIX + event.fingerprint)
end

data = {
fingerprint_data = {
query_string: query.query_string,
variables: query.provided_variables.to_json,
context: @serializer.dump(context.to_h),
operation_name: query.operation_name,
}

subscription_data = {
events: events.map { |e| [e.topic, e.fingerprint] }.to_h.to_json,
}

redis.multi do |pipeline|
pipeline.sadd(CHANNEL_PREFIX + channel_uniq_id, subscription_id)
pipeline.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, data)
pipeline.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, subscription_data)
events.each do |event|
pipeline.zincrby(FINGERPRINTS_PREFIX + event.topic, 1, event.fingerprint)
pipeline.sadd(SUBSCRIPTIONS_PREFIX + event.fingerprint, subscription_id)
pipeline.mapped_hmset(FINGERPRINT_PREFIX + event.fingerprint, fingerprint_data)
next unless config.subscription_expiration_seconds
pipeline.expire(FINGERPRINT_PREFIX + event.fingerprint, config.subscription_expiration_seconds)
end
next unless config.subscription_expiration_seconds
pipeline.expire(CHANNEL_PREFIX + channel_uniq_id, config.subscription_expiration_seconds)
Expand All @@ -158,19 +151,35 @@ def write_subscription(query, events)
end

# Return the query from "storage" (in redis)
def read_subscription(subscription_id)
def read_subscription(fingerprint)
redis.mapped_hmget(
"#{SUBSCRIPTION_PREFIX}#{subscription_id}",
"#{FINGERPRINT_PREFIX}#{fingerprint}",
:query_string, :variables, :context, :operation_name
).tap do |subscription|
return if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key
if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key
return unless config.handle_legacy_subscriptions

subscription = read_legacy_subscription(fingerprint)
return if subscription.values.all?(&:nil?)
end

subscription[:context] = @serializer.load(subscription[:context])
subscription[:variables] = JSON.parse(subscription[:variables])
subscription[:operation_name] = nil if subscription[:operation_name].strip == ""
end
end

def read_legacy_subscription(fingerprint)
subscription_ids = pipeline.smembers(SUBSCRIPTIONS_PREFIX + fingerprint)
subscription_ids.each do |subscription_id|
subscription = redis.mapped_hmget(
"#{SUBSCRIPTION_PREFIX}#{subscription_id}",
:query_string, :variables, :context, :operation_name,
)
return subscription unless subscription.empty?
end
end

def delete_subscription(subscription_id)
events = redis.hget(SUBSCRIPTION_PREFIX + subscription_id, :events)
events = events ? JSON.parse(events) : {}
Expand Down