Skip to content

Commit

Permalink
Allow async options for delete_all method
Browse files Browse the repository at this point in the history
That enables aynchronous journal clean up.

Co-Authored-By: Bartek Bulat <bartek@toptal.com>
  • Loading branch information
konalegi and barthez committed Oct 6, 2022
1 parent 5ecbee3 commit 116a2fa
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

* [#857](https://github.com/toptal/chewy/pull/857): Allow passing `wait_for_completion`, `request_per_second` and `scroll_size` options to `chewy:journal:clean` rake task and `delete_all` query builder method. ([@konalegi][])([@barthez][])

### Changes

### Bugs Fixed
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,8 @@ You may be wondering why do you need it? The answer is simple: not to lose the d

Imagine that you reset your index in a zero-downtime manner (to separate index), and at the meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset using the `Chewy::Journal` interface.

When enabled, journal can grow to enormous size, consider setting up cron job that would clean it occasionally using [`chewy:journal:clean` rake task](#chewyjournal).

### Index manipulation

```ruby
Expand Down Expand Up @@ -1144,6 +1146,17 @@ rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)"] # apply journaled changes f
rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)",users] # apply journaled changes for the past hour on UsersIndex only
```

For the cases when journal has grown up to enormous size, classical way of deletion is not quite possible. Fortunately chewy internally uses [delete-by-query](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-delete-by-query.html#docs-delete-by-query-task-api) ES function which support async execution with batching and [throttling](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#docs-delete-by-query-throttle).

Available options are listed below, they can by set by ENV variables:
* `WAIT_FOR_COMPLETION` - boolean flag, options controls async execution, It waits by default, when set to `false`, Elasticsearch performs some preflight checks, launches the request, and returns a task you can use to cancel or get the status of the task
* `REQUESTS_PER_SECOND` - float, The throttle for this request in sub-requests per second
* `SCROLL_SIZE` - integer, Size of the scroll request that powers the operation

```bash
rake chewy:journal:clean WAIT_FOR_COMPLETION=false REQUESTS_PER_SECOND=10 SCROLL_SIZE=5000
```

### RSpec integration

Just add `require 'chewy/rspec'` to your spec_helper.rb and you will get additional features:
Expand Down
8 changes: 6 additions & 2 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ def apply(since_time, fetch_limit: 10, **import_options)
#
# @param until_time [Time, DateTime] time to clean up until it
# @return [Hash] delete_by_query ES API call result
def clean(until_time = nil)
Chewy::Stash::Journal.clean(until_time, only: @only)
def clean(until_time = nil, delete_by_query_options: {})
Chewy::Stash::Journal.clean(
until_time,
only: @only,
delete_by_query_options: delete_by_query_options.merge(refresh: false)
)
end

private
Expand Down
43 changes: 38 additions & 5 deletions lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ module RakeHelper
output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}"
end

DELETE_BY_QUERY_OPTIONS = %w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze
FALSE_VALUES = %w[0 f false off].freeze

class << self
# Performs zero-downtime reindexing of all documents for the specified indexes
#
Expand Down Expand Up @@ -162,7 +165,7 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout)

subscribed_task_stats(output) do
output.puts "Applying journal entries created after #{time}"
count = Chewy::Journal.new(indexes_from(only: only, except: except)).apply(time)
count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time)
output.puts 'No journal entries were created after the specified time' if count.zero?
end
end
Expand All @@ -181,12 +184,16 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout)
# @param except [Array<Chewy::Index, String>, Chewy::Index, String] indexes to exclude from processing
# @param output [IO] output io for logging
# @return [Array<Chewy::Index>] indexes that were actually updated
def journal_clean(time: nil, only: nil, except: nil, output: $stdout)
def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout)
subscribed_task_stats(output) do
output.puts "Cleaning journal entries created before #{time}" if time
response = Chewy::Journal.new(indexes_from(only: only, except: except)).clean(time)
count = response['deleted'] || response['_indices']['_all']['deleted']
output.puts "Cleaned up #{count} journal entries"
response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options)
if response.key?('task')
output.puts "Task to cleanup the journal has been created, #{response['task']}"
else
count = response['deleted'] || response['_indices']['_all']['deleted']
output.puts "Cleaned up #{count} journal entries"
end
end
end

Expand Down Expand Up @@ -228,6 +235,26 @@ def update_mapping(name:, output: $stdout)
end
end

# Reads options that are required to run journal cleanup asynchronously from ENV hash
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
#
# @example
# Chewy::RakeHelper.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => 'false','REQUESTS_PER_SECOND' => '10','SCROLL_SIZE' => '5000'})
# # => { wait_for_completion: false, requests_per_second: 10.0, scroll_size: 5000 }
#
def delete_by_query_options_from_env(env)
env
.slice(*DELETE_BY_QUERY_OPTIONS)
.transform_keys { |k| k.downcase.to_sym }
.to_h do |key, value|
case key
when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)]
when :requests_per_second then [key, value.to_f]
when :scroll_size then [key, value.to_i]
end
end
end

def normalize_indexes(*identifiers)
identifiers.flatten(1).map { |identifier| normalize_index(identifier) }
end
Expand All @@ -248,6 +275,12 @@ def subscribed_task_stats(output = $stdout, &block)

private

def journal_indexes_from(only: nil, except: nil)
return if Array.wrap(only).empty? && Array.wrap(except).empty?

indexes_from(only: only, except: except)
end

def indexes_from(only: nil, except: nil)
indexes = if only.present?
normalize_indexes(Array.wrap(only))
Expand Down
18 changes: 15 additions & 3 deletions lib/chewy/search/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -962,10 +962,22 @@ def pluck(*fields)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
# @note The result hash is different for different API used.
# @param refresh [true, false] field names
# @param refresh [true, false] Refreshes all shards involved in the delete by query
# @param wait_for_completion [true, false] wait for request completion or run it asynchronously
# and return task reference at `.tasks/task/${taskId}`.
# @param requests_per_second [Float] The throttle for this request in sub-requests per second
# @param scroll_size [Integer] Size of the scroll request that powers the operation

# @return [Hash] the result of query execution
def delete_all(refresh: true)
request_body = only(WHERE_STORAGES).render.merge(refresh: refresh)
def delete_all(refresh: true, wait_for_completion: nil, requests_per_second: nil, scroll_size: nil)
request_body = only(WHERE_STORAGES).render.merge(
{
refresh: refresh,
wait_for_completion: wait_for_completion,
requests_per_second: requests_per_second,
scroll_size: scroll_size
}.compact
)
ActiveSupport::Notifications.instrument 'delete_query.chewy', notification_payload(request: request_body) do
request_body[:body] = {query: {match_all: {}}} if request_body[:body].empty?
Chewy.client.delete_by_query(request_body)
Expand Down
6 changes: 3 additions & 3 deletions lib/chewy/stash.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def self.entries(since_time, only: [])
# Cleans up all the journal entries until the specified time. If nothing is
# specified - cleans up everything.
#
# @param since_time [Time, DateTime] the time top boundary
# @param until_time [Time, DateTime] Clean everything before that date
# @param only [Chewy::Index, Array<Chewy::Index>] indexes to clean up journal entries for
def self.clean(until_time = nil, only: [])
def self.clean(until_time = nil, only: [], delete_by_query_options: {})
scope = self.for(only)
scope = scope.filter(range: {created_at: {lte: until_time}}) if until_time
scope.delete_all
scope.delete_all(**delete_by_query_options)
end

# Selects all the journal entries for the specified indices.
Expand Down
8 changes: 7 additions & 1 deletion lib/tasks/chewy.rake
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ namespace :chewy do

desc 'Removes journal records created before the specified timestamp for the specified indexes/types or all of them'
task clean: :environment do |_task, args|
Chewy::RakeHelper.journal_clean(**parse_journal_args(args.extras))
delete_options = Chewy::RakeHelper.delete_by_query_options_from_env(ENV)
Chewy::RakeHelper.journal_clean(
[
parse_journal_args(args.extras),
{delete_by_query_options: delete_options}
].reduce({}, :merge)
)
end
end
end
68 changes: 68 additions & 0 deletions spec/chewy/rake_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,33 @@
described_class.journal_clean(except: CitiesIndex, output: output)
expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE))
\\ACleaned up 1 journal entries
Total: \\d+s\\Z
OUTPUT
end

it 'executes asynchronously' do
output = StringIO.new
expect(Chewy.client).to receive(:delete_by_query).with(
{
body: {query: {match_all: {}}},
index: ['chewy_journal'],
refresh: false,
requests_per_second: 10.0,
scroll_size: 200,
wait_for_completion: false
}
).and_call_original
described_class.journal_clean(
output: output,
delete_by_query_options: {
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 200
}
)

expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE))
\\ATask to cleanup the journal has been created, [^\\n]*
Total: \\d+s\\Z
OUTPUT
end
Expand Down Expand Up @@ -502,4 +529,45 @@
end
end
end

describe '.delete_by_query_options_from_env' do
subject(:options) { described_class.delete_by_query_options_from_env(env) }
let(:env) do
{
'WAIT_FOR_COMPLETION' => 'false',
'REQUESTS_PER_SECOND' => '10',
'SCROLL_SIZE' => '5000'
}
end

it 'parses the options' do
expect(options).to eq(
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 5000
)
end

context 'with different boolean values' do
it 'parses the option correctly' do
%w[1 t true TRUE on ON].each do |v|
expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v}))
.to eq(wait_for_completion: true)
end

%w[0 f false FALSE off OFF].each do |v|
expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v}))
.to eq(wait_for_completion: false)
end
end
end

context 'with other env' do
let(:env) { {'SOME_ENV' => '123', 'REQUESTS_PER_SECOND' => '15'} }

it 'parses only the options' do
expect(options).to eq(requests_per_second: 15.0)
end
end
end
end
25 changes: 25 additions & 0 deletions spec/chewy/search/request_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,31 @@
request: {index: ['products'], body: {query: {match: {name: 'name3'}}}, refresh: false}
)
end

it 'delete records asynchronously' do
outer_payload = nil
ActiveSupport::Notifications.subscribe('delete_query.chewy') do |_name, _start, _finish, _id, payload|
outer_payload = payload
end
subject.query(match: {name: 'name3'}).delete_all(
refresh: false,
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 2000
)
expect(outer_payload).to eq(
index: ProductsIndex,
indexes: [ProductsIndex],
request: {
index: ['products'],
body: {query: {match: {name: 'name3'}}},
refresh: false,
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 2000
}
)
end
end

describe '#response=' do
Expand Down

0 comments on commit 116a2fa

Please sign in to comment.