Skip to content
Merged

v2 #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fetch-depth: 0 # Fetch current tag as annotated. See https://github.com/actions/checkout/issues/290
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.2
ruby-version: 3.4
- name: Configure RubyGems Credentials
uses: rubygems/configure-rubygems-credentials@main
- name: Publish to RubyGems
Expand Down
12 changes: 5 additions & 7 deletions .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby: ["3.2"]
ruby: ["3.4"]
gemfile: [
"gemfiles/rails71.gemfile",
"gemfiles/rails8.gemfile",
]
include:
- ruby: "3.4"
gemfile: "gemfiles/railsmaster.gemfile"
- ruby: "3.3"
gemfile: "gemfiles/rails8.gemfile"
- ruby: "3.1"
- ruby: "3.2"
gemfile: "gemfiles/rails7.gemfile"
- ruby: "3.0"
- ruby: "3.1"
gemfile: "gemfiles/rails7.gemfile"
- ruby: "2.7"
gemfile: "gemfiles/rails6.gemfile"
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Install system deps
run: |
sudo apt-get update
Expand Down
6 changes: 5 additions & 1 deletion .rubocop-md.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ AllCops:

Naming/FileName:
Exclude:
- '**/*.md'
- '**/*.md'

Lint/ConstantReassignment:
Exclude:
- '**/*.md'
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ AllCops:
DisplayCopNames: true
SuggestExtensions: false
NewCops: disable
TargetRubyVersion: 3.1

Standard/BlockSingleLineBraces:
Enabled: false
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Added `Downstream::Event.define` to use Data-backed event payloads.

## 1.6.0 (2025-02-14)

- Reset subscribers on code reloading to avoid double execution.
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ Each event has predefined (_reserved_) fields:

Events are stored in `app/events` folder.

You can also define events using the Data-interface:

```ruby
ProfileCreated = Downstream::Event.define(:user)

# or with an explicit identifier
ProfileCreated = Downstream::Event.define(:user) do
self.identifier = "user.profile_created"
end
```

Date-events provide the same interface as regular events but use Data classes for keeping event payloads (`event.data`) and are frozen (as well as their derivatives, such as `event.to_h`).

> [!NOTE]
> Data-events are only available in Ruby 3.2+.

### Publish events

To publish an event you must first create an instance of the event class and call `Downstream.publish` method:
Expand Down
4 changes: 2 additions & 2 deletions downstream.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ Gem::Specification.new do |spec|

spec.files = Dir.glob("lib/**/*") + %w[LICENSE.txt README.md]
spec.require_paths = ["lib"]
spec.required_ruby_version = ">= 2.7"
spec.required_ruby_version = ">= 3.1"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data class is available since 3.2

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks!

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the library can still be used with older Rubies (without data); we have a note in the Readme


spec.add_dependency "after_commit_everywhere", "~> 1.0"
spec.add_dependency "globalid", "~> 1.0"
spec.add_dependency "rails", ">= 6"
spec.add_dependency "rails", ">= 7"

spec.add_development_dependency "bundler", ">= 1.16"
spec.add_development_dependency "combustion", "~> 1.3"
Expand Down
6 changes: 0 additions & 6 deletions gemfiles/rails6.gemfile

This file was deleted.

4 changes: 2 additions & 2 deletions gemfiles/resmaster.gemfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source 'https://rubygems.org'

gem 'sqlite3', '~> 1.4'
gem 'rails', '~> 7.0'
gem 'sqlite3', '~> 2.0'
gem 'rails', '~> 8.0'
gem 'rails_event_store', github: 'RailsEventStore/rails_event_store'

gemspec path: '..'
44 changes: 30 additions & 14 deletions lib/downstream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

require "downstream/config"
require "downstream/event"
require "downstream/data_event"
require "downstream/subscriber"
require "downstream/pubsub_adapters/abstract_pubsub"
require "downstream/subscriber_job"

Expand All @@ -27,18 +29,26 @@ def subscribe(subscriber = nil, to: nil, async: false, &block)
subscriber ||= block if block
raise ArgumentError, "Subsriber must be present" if subscriber.nil?

identifier = construct_identifier(subscriber, to)
construct_identifiers(subscriber, to).map do
pubsub.subscribe(_1, subscriber, async: async)
end.then do
next _1.first if _1.size == 1

pubsub.subscribe(identifier, subscriber, async: async)
_1
end
end

# temporary subscriptions
def subscribed(subscriber, to: nil, &block)
raise ArgumentError, "Subsriber must be present" if subscriber.nil?

identifier = construct_identifier(subscriber, to)
construct_identifiers(subscriber, to).map do
pubsub.subscribed(_1, subscriber, &block)
end.then do
next _1.first if _1.size == 1

pubsub.subscribed(identifier, subscriber, &block)
_1
end
end

def publish(event)
Expand All @@ -47,28 +57,34 @@ def publish(event)

private

def construct_identifier(subscriber, to)
to ||= infer_event_from_subscriber(subscriber) if subscriber.is_a?(Module)
def construct_identifiers(subscriber, to)
to ||= infer_events_from_subscriber(subscriber) if subscriber.is_a?(Module)

if to.nil?
raise ArgumentError, "Couldn't infer event from subscriber. " \
"Please, specify event using `to:` option"
end

identifier = if to.is_a?(Class) && Event >= to # rubocop:disable Style/YodaCondition
to.identifier
else
to
end
Array(to).map do
identifier = if _1.is_a?(Class) && Event >= _1 # rubocop:disable Style/YodaCondition
_1.identifier
else
_1
end

"#{config.namespace}.#{identifier}"
"#{config.namespace}.#{identifier}"
end
end

def infer_event_from_subscriber(subscriber)
def infer_events_from_subscriber(subscriber)
if subscriber.is_a?(Class) && Subscriber >= subscriber # rubocop:disable Style/YodaCondition
return subscriber.event_names
end

event_class_name = subscriber.name.split("::").yield_self do |parts|
# handle explicti top-level name, e.g. ::Some::Event
parts.shift if parts.first.empty?
# drop last part – it's a unique subscriber name
# drop last partit's a unique subscriber name
parts.pop

parts.last.sub!(/^On/, "")
Expand Down
32 changes: 32 additions & 0 deletions lib/downstream/data_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module Downstream
class DataEvent < Event
class << self
attr_writer :data_class

def data_class
return @data_class if @data_class

@data_class = superclass.data_class
end

undef_method :attributes
undef_method :defined_attributes
end

def initialize(event_id: nil, **attrs)
@event_id = event_id || SecureRandom.hex(10)
@data = self.class.data_class.new(**attrs)
freeze
end

def to_h
{
type:,
event_id:,
data: data.to_h.freeze
}.freeze
end
end
end
13 changes: 13 additions & 0 deletions lib/downstream/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ module Downstream
class Engine < ::Rails::Engine
config.downstream = Downstream.config

::GlobalID::Locator.use "downstream" do |gid|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like a better place for this code.

params = gid.params.each_with_object({}) do |(key, value), memo|
memo[key.to_sym] = if value.is_a?(String) && value.start_with?("gid://")
GlobalID::Locator.locate(value)
else
value
end
end

gid.model_name.constantize
.new(event_id: gid.model_id, **params)
end

config.to_prepare do
Downstream.pubsub.reset
ActiveSupport.run_load_hooks("downstream-events", Downstream)
Expand Down
31 changes: 16 additions & 15 deletions lib/downstream/event.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
# frozen_string_literal: true

GlobalID::Locator.use "downstream" do |gid|
params = gid.params.each_with_object({}) do |(key, value), memo|
memo[key.to_sym] = if value.is_a?(String) && value.start_with?("gid://")
GlobalID::Locator.locate(value)
else
value
end
end

gid.model_name.constantize
.new(event_id: gid.model_id, **params)
end

module Downstream
class Event
extend ActiveModel::Naming
Expand All @@ -26,7 +13,7 @@ class << self
def identifier
return @identifier if instance_variable_defined?(:@identifier)

@identifier = name.underscore.tr("/", ".")
@identifier = name.underscore.tr("/", ".").gsub(/_event$/, "")
end

# define store readers
Expand Down Expand Up @@ -56,6 +43,20 @@ def defined_attributes
end
end

def define(*fields, &)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attributes and and define feel to me like different strategies to be applied or configured on the Event class.

I do like the new public method, but I think the public interface with this definition doesn't clearly state what's going on, that one variant is mutable and the other immutable. Perhaps the strategy could be defined through an attributes parameter, like immutable: true.

fields.each do |field|
raise ArgumentError, "#{field} is reserved" if RESERVED_ATTRIBUTES.include?(field)
end

data_class = ::Data.define(*fields)

Class.new(DataEvent, &).tap do
_1.data_class = data_class

_1.delegate(*fields, to: :data)
end
end

def i18n_scope
:activemodel
end
Expand Down Expand Up @@ -94,7 +95,7 @@ def to_h
end

def to_global_id
new_data = data.each_with_object({}) do |(key, value), memo|
new_data = data.to_h.each_with_object({}) do |(key, value), memo|
memo[key] = if value.respond_to?(:to_global_id)
value.to_global_id
else
Expand Down
2 changes: 1 addition & 1 deletion lib/downstream/rspec/have_published_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def matches?(block)
end

def failure_message
(+"expected to publish #{event_class.identifier} event").tap do |msg|
"expected to publish #{event_class.identifier} event".tap do |msg|
msg << " #{message_expectation_modifier}, but haven't published"
end
end
Expand Down
32 changes: 32 additions & 0 deletions lib/downstream/subscriber.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module Downstream
class Subscriber
class << self
# All public names are considered event handlers
# (same concept as action_names in controllers/mailers)
def event_names
@event_names ||= begin
# All public instance methods of this class, including ancestors
methods = (public_instance_methods(true) -
# Except for public instance methods of Base and its ancestors
Downstream.public_instance_methods(true) +
# Be sure to include shadowed public instance methods of this class
public_instance_methods(false)).uniq.map(&:to_s)
methods.to_set
end
end

# Downstream subscriber interface
def call(event)
new.process_event(event)
end
end

def process_event(event)
# TODO: callbacks? instrumentation?
# TODO: namespaced events?
public_send(event.type, event)
end
end
end
Loading