Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom serializer #45

Merged
merged 7 commits into from
Jul 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ slides.md
/test/dummy
/test/log
/database.sqlite
/test/combustion/log/test.log
/test/combustion/database.sqlite
2 changes: 1 addition & 1 deletion lib/acidic_job/extensions/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def serialize_job(*args, **kwargs)
arguments = args || @args
arguments += [kwargs] unless kwargs.empty?
normalized_args = ::Sidekiq.load_json(::Sidekiq.dump_json(arguments))
item = { "class" => self.class, "args" => normalized_args, "jid" => jid }
item = { "class" => self.class.name, "args" => normalized_args, "jid" => jid }
sidekiq_options = sidekiq_options_hash || {}

sidekiq_options.merge(item)
Expand Down
11 changes: 6 additions & 5 deletions lib/acidic_job/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "active_record"
require "global_id"
require "active_support/core_ext/object/with_options"
require_relative "./serializer"

module AcidicJob
class Run < ActiveRecord::Base
Expand All @@ -17,11 +18,11 @@ class Run < ActiveRecord::Base

after_create_commit :enqueue_staged_job, if: :staged?

serialize :error_object
serialize :serialized_job
serialize :workflow
serialize :returning_to
store :attr_accessors
serialize :serialized_job, JSON
serialize :error_object, Serializer
serialize :workflow, Serializer
serialize :returning_to, Serializer
store :attr_accessors, coder: Serializer

validates :staged, inclusion: { in: [true, false] } # uses database default
validates :serialized_job, presence: true
Expand Down
134 changes: 134 additions & 0 deletions lib/acidic_job/serializer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# frozen_string_literal: true

require "active_job/serializers"
require "active_job/arguments"
require "json"

class WorkerSerializer < ActiveJob::Serializers::ObjectSerializer
def serialize(worker)
# {"_aj_serialized"=>"WorkerSerializer", "class"=>"SuccessfulArgWorker", "args"=>[123], "kwargs"=>{}}]
super(
"class" => worker.class.name,
"args" => worker.instance_variable_get(:@__acidic_job_args),
"kwargs" => worker.instance_variable_get(:@__acidic_job_kwargs)
)
end

def deserialize(hash)
worker_class = hash["class"].constantize
worker_class.new(*hash["args"], **hash["kwargs"])
end

def serialize?(argument)
defined?(Sidekiq) && argument.class.include?(Sidekiq::Worker)
end
end

class ExceptionSerializer < ActiveJob::Serializers::ObjectSerializer
def serialize(exception)
hash = {
"class" => exception.class.name,
"message" => exception.message,
"cause" => exception.cause,
"backtrace" => {}
}

exception.backtrace.map do |trace|
path, _, location = trace.rpartition("/")

next if hash["backtrace"].key?(path)

hash["backtrace"][path] = location
end

super(hash)
end

def deserialize(hash)
exception_class = hash["class"].constantize
exception = exception_class.new(hash["message"])
exception.set_backtrace(hash["backtrace"].map do |path, location|
[path, location].join("/")
end)
exception
end

def serialize?(argument)
defined?(Exception) && argument.is_a?(Exception)
end
end

class FinishedPointSerializer < ActiveJob::Serializers::ObjectSerializer
def serialize(finished_point)
super(
"class" => finished_point.class.name
)
end

def deserialize(hash)
finished_point_class = hash["class"].constantize
finished_point_class.new
end

def serialize?(argument)
defined?(::AcidicJob::FinishedPoint) && argument.is_a?(::AcidicJob::FinishedPoint)
end
end

class RecoveryPointSerializer < ActiveJob::Serializers::ObjectSerializer
def serialize(recovery_point)
super(
"class" => recovery_point.class.name,
"name" => recovery_point.name
)
end

def deserialize(hash)
recovery_point_class = hash["class"].constantize
recovery_point_class.new(hash["name"])
end

def serialize?(argument)
defined?(::AcidicJob::RecoveryPoint) && argument.is_a?(::AcidicJob::RecoveryPoint)
end
end

ActiveJob::Serializers.add_serializers WorkerSerializer, ExceptionSerializer, FinishedPointSerializer,
RecoveryPointSerializer

# ...
module AcidicJob
module Arguments
include ActiveJob::Arguments
extend self # rubocop:disable Style/ModuleFunction

# `ActiveJob` will throw an error if it tries to deserialize a GlobalID record.
# However, this isn't the behavior that we want for our custom `ActiveRecord` serializer.
# Since `ActiveRecord` does _not_ reset instance record state to its pre-transactional state
# on a transaction ROLLBACK, we can have GlobalID entries in a serialized column that point to
# non-persisted records. This is ok. We should simply return `nil` for that portion of the
# serialized field.
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
rescue ActiveRecord::RecordNotFound
nil
end
end

class Serializer
# Used for `serialize` method in ActiveRecord
class << self
def load(json)
return if json.nil? || json.empty?

data = JSON.parse(json)
Arguments.deserialize(data).first
end

def dump(obj)
data = Arguments.serialize [obj]
data.to_json
end
end
end
end
Binary file removed test/combustion/database.sqlite
Binary file not shown.
Loading