Skip to content

Commit

Permalink
Schedule copay notification jobs in time intervals (#12964)
Browse files Browse the repository at this point in the history
* Schedule jobs using throttle technique

* Fix time in comment

* spread_interval -> job_interval

* Make comment more clear

* Add spec for batch processing

* Add debt related workers to codeowners

* Remove CODEOWNERS change
  • Loading branch information
Scott James authored Jun 15, 2023
1 parent 0cd2a5e commit 9f42f18
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 22 deletions.
31 changes: 11 additions & 20 deletions app/workers/copay_notifications/new_statement_notification_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,21 @@ class NewStatementNotificationJob

sidekiq_options retry: false

def self.throttle
return Sidekiq::Limiter.unlimited if Rails.env.test?

Sidekiq::Limiter.concurrent('new-copay-statements', 4, wait_timeout: 259_200, lock_timeout: 120)
end

LIMITER = throttle
MCP_NOTIFICATION_TEMPLATE = Settings.vanotify.services.dmc.template_id.vha_new_copay_statement_email
STATSD_KEY_PREFIX = 'api.copay_notifications.new_statement'

def perform(statement)
LIMITER.within_limit do
StatsD.increment("#{STATSD_KEY_PREFIX}.total")
mpi_response = get_mpi_profile(identifier: statement['veteranIdentifier'],
identifier_type: statement['identifierType'],
facility_id: statement['facilityNum'])

if mpi_response.ok?
StatsD.increment("#{STATSD_KEY_PREFIX}.mpi.success")
create_notification_email_job(vet360_id: mpi_response.profile.vet360_id, icn: mpi_response.profile.icn)
else
StatsD.increment("#{STATSD_KEY_PREFIX}.mpi.failure")
raise mpi_response.error
end
StatsD.increment("#{STATSD_KEY_PREFIX}.total")
mpi_response = get_mpi_profile(identifier: statement['veteranIdentifier'],
identifier_type: statement['identifierType'],
facility_id: statement['facilityNum'])

if mpi_response.ok?
StatsD.increment("#{STATSD_KEY_PREFIX}.mpi.success")
create_notification_email_job(vet360_id: mpi_response.profile.vet360_id, icn: mpi_response.profile.icn)
else
StatsD.increment("#{STATSD_KEY_PREFIX}.mpi.failure")
raise mpi_response.error
end
end

Expand Down
12 changes: 10 additions & 2 deletions app/workers/copay_notifications/parse_new_statements_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ class ParseNewStatementsJob

sidekiq_options retry: false

# time (in seconds) between scheduling batch of jobs
JOB_INTERVAL = Settings.mcp.notifications.job_interval
# number of jobs to perform at next interval
BATCH_SIZE = Settings.mcp.notifications.batch_size

def perform(statements_json_byte)
StatsD.increment('api.copay_notifications.json_file.total')
# Decode and parse large json file (~60-90k objects)
statements_json = Oj.load(Base64.decode64(statements_json_byte))
unique_statements = statements_json.uniq { |statement| statement['veteranIdentifier'] }

unique_statements.each do |statement|
CopayNotifications::NewStatementNotificationJob.perform_async(statement)
unique_statements.each_with_index do |statement, index|
# For every BATCH_SIZE jobs, enqueue the next BATCH_SIZE amount of jobs JOB_INTERVAL seconds later
CopayNotifications::NewStatementNotificationJob.perform_in(
JOB_INTERVAL * (index / BATCH_SIZE), statement
)
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ mcp:
mock: false
mock_vista: false
api_key: abcd1234abcd1234abcd1234abcd1234abcd1234
notifications:
job_interval: 90
batch_size: 100

fsr:
prefill: true
Expand Down
3 changes: 3 additions & 0 deletions config/settings/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ mcp:
base_path: /base/path
service_name: VBS
mock: true
notifications:
job_interval: 90
batch_size: 100
dgi:
jwt:
public_key_path: modules/meb_api/spec/fixtures/dgi_public_test.pem
Expand Down
23 changes: 23 additions & 0 deletions spec/jobs/copay_notifications/parse_new_statements_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,28 @@
.to(1)
end
end

context 'batch processing' do
let(:statements_json_byte) do
Base64.encode64(File.read('spec/fixtures/medical_copays/new_statements.json'))
end
let(:job_interval) { Settings.mcp.notifications.job_interval }

before do
stub_const('CopayNotifications::ParseNewStatementsJob::BATCH_SIZE', 1)
end

it 'starts the jobs at different times' do
job = described_class.new
statement_json = Oj.load(Base64.decode64(statements_json_byte))
first_statement = statement_json[0]
expect(CopayNotifications::NewStatementNotificationJob).to receive(:perform_in).with(0, first_statement)

second_statement = statement_json[1]
expect(CopayNotifications::NewStatementNotificationJob).to receive(:perform_in).with(job_interval,
second_statement)
job.perform(statements_json_byte)
end
end
end
end

0 comments on commit 9f42f18

Please sign in to comment.