Skip to content

Commit

Permalink
Merge pull request joker1007#13 from joker1007/add-execution
Browse files Browse the repository at this point in the history
Add CronoTrigger::Models::Execution model for tracking execution
  • Loading branch information
joker1007 authored Mar 25, 2019
2 parents b72e94c + 7969322 commit f2cdad1
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 29 deletions.
1 change: 1 addition & 0 deletions lib/crono_trigger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "concurrent"
require "crono_trigger/models/worker"
require "crono_trigger/models/signal"
require "crono_trigger/models/execution"
require "crono_trigger/worker"
require "crono_trigger/polling_thread"
require "crono_trigger/schedulable"
Expand Down
22 changes: 22 additions & 0 deletions lib/crono_trigger/execution_tracker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module CronoTrigger
class ExecutionTracker
def initialize(schedulable)
@schedulable = schedulable
end

def track(&pr)
if @schedulable.track_execution
begin
execution = @schedulable.crono_trigger_executions.create_with_timestamp!
pr.call
execution.complete!
rescue => e
execution.error!(e)
raise
end
else
pr.call
end
end
end
end
35 changes: 35 additions & 0 deletions lib/crono_trigger/models/execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module CronoTrigger
module Models
class Execution < ActiveRecord::Base
self.table_name = "crono_trigger_executions"

belongs_to :schedule, polymorphic: true, inverse_of: :crono_trigger_executions

scope :recently, ->(schedule_type:) { where(schedule_type: schedule_type).order(executed_at: :desc) }

enum status: {
executing: "executing",
completed: "completed",
failed: "failed",
}

def self.create_with_timestamp!
create!(executed_at: Time.current, status: :executing, worker_id: CronoTrigger.config.worker_id)
end

def complete!
update!(status: :completed, completed_at: Time.current)
end

def error!(exception)
update!(status: :failed, completed_at: Time.current, error_name: exception.class.to_s, error_reason: exception.message)
end

def retry!
return false if schedule.locking?

schedule.retry!
end
end
end
end
19 changes: 15 additions & 4 deletions lib/crono_trigger/schedulable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "tzinfo"

require "crono_trigger/exception_handler"
require "crono_trigger/execution_tracker"

module CronoTrigger
module Schedulable
Expand All @@ -26,9 +27,12 @@ class AbortExecution < StandardError; end

included do
CronoTrigger::Schedulable.included_by << self
class_attribute :crono_trigger_options, :executable_conditions
class_attribute :crono_trigger_options, :executable_conditions, :track_execution
self.crono_trigger_options ||= {}
self.executable_conditions ||= []
self.track_execution ||= false

has_many :crono_trigger_executions, class_name: "CronoTrigger::Models::Execution", as: :schedule, inverse_of: :schedule

define_model_callbacks :execute, :retry

Expand Down Expand Up @@ -107,12 +111,15 @@ def clear_executable_conditions
end

def do_execute
execution_tracker = ExecutionTracker.new(self)
run_callbacks :execute do
catch(:ok_without_reset) do
catch(:ok) do
catch(:retry) do
catch(:abort) do
execute
execution_tracker.track do
execute
end
throw :ok
end
raise AbortExecution
Expand Down Expand Up @@ -159,12 +166,16 @@ def activate_schedule!(at: Time.current)
self
end

def retry!
def retry!(immediately: false)
run_callbacks :retry do
logger.info "Retry #{self.class}-#{id}" if logger

now = Time.current
wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval
if immediately
wait = 0
else
wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval
end
attributes = {
crono_trigger_column_name(:next_execute_at) => now + wait,
crono_trigger_column_name(:execute_lock) => 0,
Expand Down
72 changes: 56 additions & 16 deletions lib/crono_trigger/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ class Web < Sinatra::Application
end
end


post "/models/executions/:id/retry" do
CronoTrigger::Models::Execution.find(params[:id]).retry!
status 200
body ""
end

post "/models/:name/:id/retry" do
model_class = CronoTrigger::Schedulable.included_by.find { |c| c.name == params[:name] }
if model_class
Expand All @@ -111,22 +118,22 @@ class Web < Sinatra::Application
now = Time.now
records = @scheduled_records.map do |r|
{
"crono_trigger_status" => r.crono_trigger_status,
"id" => r.id,
"cron" => r[r.crono_trigger_column_name(:cron)],
"next_execute_at" => r[r.crono_trigger_column_name(:next_execute_at)],
"last_executed_at" => r[r.crono_trigger_column_name(:last_executed_at)],
"timezone" => r[r.crono_trigger_column_name(:timezone)],
"execute_lock" => r[r.crono_trigger_column_name(:execute_lock)],
"locked_by" => r[r.crono_trigger_column_name(:locked_by)],
"started_at" => r[r.crono_trigger_column_name(:started_at)],
"finished_at" => r[r.crono_trigger_column_name(:finished_at)],
"last_error_name" => r[r.crono_trigger_column_name(:last_error_name)],
"last_error_reason" => r[r.crono_trigger_column_name(:last_error_reason)],
"last_error_time" => r[r.crono_trigger_column_name(:last_error_time)],
"retry_count" => r[r.crono_trigger_column_name(:retry_count)],
"time_to_unlock" => [(r.class.execute_lock_timeout + r[r.crono_trigger_column_name(:execute_lock)]) - now.to_i, 0].max,
"delay_sec" => r.locking?(at: now) ? 0 : (now - r[r.crono_trigger_column_name(:next_execute_at)]).to_i,
-"crono_trigger_status" => r.crono_trigger_status,
-"id" => r.id,
-"cron" => r[r.crono_trigger_column_name(:cron)],
-"next_execute_at" => r[r.crono_trigger_column_name(:next_execute_at)],
-"last_executed_at" => r[r.crono_trigger_column_name(:last_executed_at)],
-"timezone" => r[r.crono_trigger_column_name(:timezone)],
-"execute_lock" => r[r.crono_trigger_column_name(:execute_lock)],
-"locked_by" => r[r.crono_trigger_column_name(:locked_by)],
-"started_at" => r[r.crono_trigger_column_name(:started_at)],
-"finished_at" => r[r.crono_trigger_column_name(:finished_at)],
-"last_error_name" => r[r.crono_trigger_column_name(:last_error_name)],
-"last_error_reason" => r[r.crono_trigger_column_name(:last_error_reason)],
-"last_error_time" => r[r.crono_trigger_column_name(:last_error_time)],
-"retry_count" => r[r.crono_trigger_column_name(:retry_count)],
-"time_to_unlock" => [(r.class.execute_lock_timeout + r[r.crono_trigger_column_name(:execute_lock)]) - now.to_i, 0].max,
-"delay_sec" => r.locking?(at: now) ? 0 : (now - r[r.crono_trigger_column_name(:next_execute_at)]).to_i,
}
end
Oj.dump({
Expand Down Expand Up @@ -160,5 +167,38 @@ class Web < Sinatra::Application
get "/models" do
erb :index
end

get "/models/:name/executions.:format" do
if params[:format] == "json"
model_class = CronoTrigger::Schedulable.included_by.find { |c| c.name == params[:name] }
if model_class
rel = CronoTrigger::Models::Execution.recently(schedule_type: model_class)
rel.where!("executed_at >= ?", Time.parse(params[:from])) if params[:from]
rel.where!("executed_at <= ?", Time.parse(params[:to])) if params[:to]
rel = rel.limit(params[:limit] || 100)
records = rel.map do |r|
{
-"id" => r.id,
-"schedule_id" => r.schedule_id,
-"schedule_type" => r.schedule_type,
-"worker_id" => r.worker_id,
-"executed_at" => r.executed_at,
-"completed_at" => r.completed_at,
-"status" => r.status,
-"error_name" => r.error_name,
-"error_reason" => r.error_reason,
}
end
Oj.dump({
records: records,
}, mode: :compat)
else
status 404
"Model Class is not found"
end
else
raise "unknown format"
end
end
end
end
15 changes: 15 additions & 0 deletions lib/generators/crono_trigger/install/templates/install.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,20 @@ def change
end

add_index :crono_trigger_signals, [:sent_at, :worker_id]

create_table :crono_trigger_executions do |t|
t.integer :schedule_id, null: false
t.string :schedule_type, null: false
t.string :worker_id, null: false
t.datetime :executed_at, null: false
t.datetime :completed_at
t.string :status, null: false, default: "executing"
t.string :error_name
t.string :error_reason
end

add_index :crono_trigger_executions, [:schedule_type, :schedule_id, :executed_at], name: "index_crono_trigger_executions_on_schtype_schid_executed_at"
add_index :crono_trigger_executions, [:schedule_type, :executed_at]
add_index :crono_trigger_executions, [:executed_at]
end
end
65 changes: 65 additions & 0 deletions spec/crono_trigger/models/execution_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require "spec_helper"

RSpec.describe CronoTrigger::Models::Execution do
let(:notification) do
Notification.create!(
name: "notification1",
cron: "0,30 * * * *",
started_at: Time.current,
).tap(&:activate_schedule!)
end

describe ".create_with_timestamp", aggregate_failures: true do
subject { notification.crono_trigger_executions.create_with_timestamp! }

it "creates record with current time as executed_at" do
time = Time.utc(2017, 6, 18, 1, 0)
Timecop.freeze(time) do
is_expected.to be_persisted
expect(subject.executed_at).to eq(time)
expect(subject.status).to eq("executing")
expect(subject.worker_id).to eq(CronoTrigger.config.worker_id)
end
end
end

describe "#complete!" do
let(:execution) { notification.crono_trigger_executions.create_with_timestamp! }

it "update (status = completed, completed_at = now)" do
time1 = Time.utc(2017, 6, 18, 1, 0)
Timecop.freeze(time1) do
execution
end

time2 = Time.utc(2017, 6, 18, 2, 0)
Timecop.freeze(time2) do
execution.complete!
end

expect(execution.completed_at).to eq(time2)
expect(execution.status).to eq("completed")
end
end

describe "#error!" do
let(:execution) { notification.crono_trigger_executions.create_with_timestamp! }

it "update (status = failed, error_name = ex.class_name, error_reason = ex.message)" do
time1 = Time.utc(2017, 6, 18, 1, 0)
Timecop.freeze(time1) do
execution
end

time2 = Time.utc(2017, 6, 18, 2, 0)
Timecop.freeze(time2) do
execution.error!(RuntimeError.new("failed!!"))
end

expect(execution.completed_at).to eq(time2)
expect(execution.status).to eq("failed")
expect(execution.error_name).to eq("RuntimeError")
expect(execution.error_reason).to eq("failed!!")
end
end
end
15 changes: 15 additions & 0 deletions spec/db/migrate/00_create_crono_trigger_system_tables.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ def change
end

add_index :crono_trigger_signals, [:sent_at, :worker_id]

create_table :crono_trigger_executions do |t|
t.integer :schedule_id, null: false
t.string :schedule_type, null: false
t.string :worker_id, null: false
t.datetime :executed_at, null: false
t.datetime :completed_at
t.string :status, null: false, default: "executing"
t.string :error_name
t.string :error_reason
end

add_index :crono_trigger_executions, [:schedule_type, :schedule_id, :executed_at], name: "index_crono_trigger_executions_on_schtype_schid_executed_at"
add_index :crono_trigger_executions, [:schedule_type, :executed_at], name: "index_crono_trigger_executions_on_schtype_executed_at"
add_index :crono_trigger_executions, [:executed_at]
end
end
end
Loading

0 comments on commit f2cdad1

Please sign in to comment.