Skip to content

Commit

Permalink
Merge pull request #89 from /issues/88
Browse files Browse the repository at this point in the history
Big refactor to make the backend registerable and swappable
  • Loading branch information
jwoertink authored May 19, 2024
2 parents f68b822 + 2f1360c commit 4c57275
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 29 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
specs:
env:
REDIS_URL: redis://redis:6379
CABLE_BACKEND_URL: redis://redis:6379
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -40,9 +40,9 @@ jobs:
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Cache Crystal
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/.cache/crystal
key: ${{ runner.os }}-crystal
Expand Down
4 changes: 3 additions & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ require "./support/channels/*"
Cable.configure do |settings|
settings.route = "/updates"
settings.token = "test_token"
settings.redis_ping_interval = 2.seconds
settings.url = ENV.fetch("CABLE_BACKEND_URL", "redis://localhost:6379")
settings.backend_class = Cable::RedisBackend
settings.backend_ping_interval = 2.seconds
settings.restart_error_allowance = 2
settings.on_error = ->(exception : Exception, message : String) do
FakeExceptionService.notify(exception, message: message)
Expand Down
4 changes: 2 additions & 2 deletions src/backend/dev/backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ module Cable
@@subscriptions.delete(stream_identifier)
end

def ping_redis_subscribe
def ping_subscribe_connection
end

def ping_redis_publish
def ping_publish_connection
end
end
end
11 changes: 8 additions & 3 deletions src/backend/redis/backend.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
require "redis"

module Cable
class RedisBackend < Cable::BackendCore
register "redis" # redis://
register "rediss" # rediss://

# connection management
getter redis_subscribe : Redis::Connection = Redis::Connection.new(URI.parse(Cable.settings.url))
getter redis_publish : Redis::Client = Redis::Client.new(URI.parse(Cable.settings.url))
Expand Down Expand Up @@ -72,13 +77,13 @@ module Cable
# then publish a special channel/message broadcast
# the @server.redis_subscribe picks up this special combination
# and calls ping on the block loop for us
def ping_redis_subscribe
def ping_subscribe_connection
Cable.server.publish(Cable::INTERNAL[:channel], "ping")
end

def ping_redis_publish
def ping_publish_connection
result = redis_publish.run({"ping"})
Cable::Logger.debug { "Cable::RedisPinger.ping_redis_publish -> #{result}" }
Cable::Logger.debug { "Cable::BackendPinger.ping_publish_connection -> #{result}" }
end
end
end
6 changes: 3 additions & 3 deletions src/backend/redis/legacy/backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@

# ping/pong

def ping_redis_subscribe
def ping_subscribe_connection
Cable.server.publish("_internal", "ping")
end

def ping_redis_publish
def ping_publish_connection
request = Redis::Request.new
request << "ping"
result = redis_subscribe._connection.send(request)
Cable::Logger.debug { "Cable::RedisPinger.ping_redis_publish -> #{result}" }
Cable::Logger.debug { "Cable::BackendPinger.ping_publish_connection -> #{result}" }
end
end
end
Expand Down
7 changes: 3 additions & 4 deletions src/cable.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require "habitat"
require "json"
require "redis"
require "./cable/**"

# TODO: Write documentation for `Cable`
Expand Down Expand Up @@ -30,10 +29,10 @@ module Cable
Habitat.create do
setting route : String = Cable.message(:default_mount_path), example: "/cable"
setting token : String = "token", example: "token"
setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379"
setting url : String = ENV["CABLE_BACKEND_URL"], example: "redis://localhost:6379"
setting disable_sec_websocket_protocol_header : Bool = false
setting backend_class : Cable::BackendCore.class = Cable::RedisBackend, example: "Cable::RedisBackend"
setting redis_ping_interval : Time::Span = 15.seconds
setting backend_class : Cable::BackendCore.class = Cable::BackendRegistry, example: "Cable::RedisBackend"
setting backend_ping_interval : Time::Span = 15.seconds
setting restart_error_allowance : Int32 = 20
setting on_error : Proc(Exception, String, Nil) = ->(exception : Exception, message : String) do
Cable::Logger.error(exception: exception) { message }
Expand Down
35 changes: 33 additions & 2 deletions src/cable/backend_core.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
module Cable
abstract class BackendCore
def self.register(uri_scheme : String, backend : BackendCore.class = self)
::Cable::BackendRegistry.register uri_scheme, backend
end

# connection management
abstract def subscribe_connection
abstract def publish_connection
Expand All @@ -17,8 +21,35 @@ module Cable
abstract def unsubscribe(stream_identifier : String)

# ping/pong
abstract def ping_subscribe_connection
abstract def ping_publish_connection
end

class BackendRegistry < BackendCore
REGISTERED_BACKENDS = {} of String => BackendCore.class

def self.register(uri_scheme : String, backend : BackendCore.class = self)
REGISTERED_BACKENDS[uri_scheme] = backend
end

@backend : BackendCore

def initialize
@backend = REGISTERED_BACKENDS[URI.parse(::Cable.settings.url).scheme].new
end

abstract def ping_redis_subscribe
abstract def ping_redis_publish
delegate(
subscribe_connection,
publish_connection,
close_subscribe_connection,
close_publish_connection,
open_subscribe_connection,
publish_message,
subscribe,
unsubscribe,
ping_subscribe_connection,
ping_publish_connection,
to: @backend
)
end
end
10 changes: 5 additions & 5 deletions src/cable/redis_pinger.cr → src/cable/backend_pinger.cr
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
require "tasker"

module Cable
class RedisPinger
class BackendPinger
private getter task : Tasker::Task

def initialize(@server : Cable::Server)
@task = Tasker.every(Cable.settings.redis_ping_interval) do
@server.backend.ping_redis_subscribe
@server.backend.ping_redis_publish
@task = Tasker.every(Cable.settings.backend_ping_interval) do
@server.backend.ping_subscribe_connection
@server.backend.ping_publish_connection
rescue e
stop
Cable::Logger.error { "Cable::RedisPinger Exception: #{e.class.name} -> #{e.message}" }
Cable::Logger.error { "Cable::BackendPinger Exception: #{e.class.name} -> #{e.message}" }
# Restart cable if something happened
Cable.server.count_error!
Cable.restart if Cable.server.restart?
Expand Down
2 changes: 0 additions & 2 deletions src/cable/channel.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Cable
class Channel
class CloseRedisFiber < Exception; end

CHANNELS = {} of String => Cable::Channel.class

macro inherited
Expand Down
8 changes: 4 additions & 4 deletions src/cable/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ module Cable
getter connections = {} of String => Cable::Connection
getter errors = 0
getter fiber_channel = ::Channel({String, String}).new
getter pinger : Cable::RedisPinger do
Cable::RedisPinger.new(self)
getter pinger : Cable::BackendPinger do
Cable::BackendPinger.new(self)
end
getter backend : Cable::BackendCore do
Cable.settings.backend_class.new
Expand Down Expand Up @@ -112,7 +112,7 @@ module Cable
end
end

# redis only accepts strings, so we should be strict here
# Some backends only accept strings, so we should be strict here
def publish(channel : String, message : String)
backend.publish_message(channel, message)
end
Expand Down Expand Up @@ -169,7 +169,7 @@ module Cable
backend.close_publish_connection
rescue e : IO::Error
# the @writer IO is closed already
Cable::Logger.debug { "Cable::Server#shutdown Connection to redis was severed: #{e.message}" }
Cable::Logger.debug { "Cable::Server#shutdown Connection to backend was severed: #{e.message}" }
end
pinger.stop
connections.each do |_k, v|
Expand Down

0 comments on commit 4c57275

Please sign in to comment.