Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow asynchronously cleanup journal #857

Merged
merged 1 commit into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

* [#857](https://github.com/toptal/chewy/pull/857): Allow passing `wait_for_completion`, `request_per_second` and `scroll_size` options to `chewy:journal:clean` rake task and `delete_all` query builder method. ([@konalegi][])([@barthez][])

### Changes

### Bugs Fixed
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,8 @@ You may be wondering why do you need it? The answer is simple: not to lose the d

Imagine that you reset your index in a zero-downtime manner (to separate index), and at the meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset using the `Chewy::Journal` interface.

When enabled, journal can grow to enormous size, consider setting up cron job that would clean it occasionally using [`chewy:journal:clean` rake task](#chewyjournal).

### Index manipulation

```ruby
Expand Down Expand Up @@ -1144,6 +1146,17 @@ rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)"] # apply journaled changes f
rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)",users] # apply journaled changes for the past hour on UsersIndex only
```

When the size of the journal becomes very large, the classical way of deletion would be obstructive and resource consuming. Fortunately, Chewy internally uses [delete-by-query](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-delete-by-query.html#docs-delete-by-query-task-api) ES function which supports async execution with batching and [throttling](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#docs-delete-by-query-throttle).

The available options, which can be set by ENV variables, are listed below:
* `WAIT_FOR_COMPLETION` - a boolean flag. It controls async execution. It waits by default. When set to `false` (`0`, `f`, `false` or `off` in any case spelling is accepted as `false`), Elasticsearch performs some preflight checks, launches the request, and returns a task reference you can use to cancel the task or get its status.
* `REQUESTS_PER_SECOND` - float. The throttle for this request in sub-requests per second. No throttling is enforced by default.
* `SCROLL_SIZE` - integer. The number of documents to be deleted in single sub-request. The default batch size is 1000.

```bash
rake chewy:journal:clean WAIT_FOR_COMPLETION=false REQUESTS_PER_SECOND=10 SCROLL_SIZE=5000
kamtop marked this conversation as resolved.
Show resolved Hide resolved
```

### RSpec integration

Just add `require 'chewy/rspec'` to your spec_helper.rb and you will get additional features:
Expand Down
8 changes: 6 additions & 2 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ def apply(since_time, fetch_limit: 10, **import_options)
#
# @param until_time [Time, DateTime] time to clean up until it
# @return [Hash] delete_by_query ES API call result
def clean(until_time = nil)
Chewy::Stash::Journal.clean(until_time, only: @only)
def clean(until_time = nil, delete_by_query_options: {})
Chewy::Stash::Journal.clean(
until_time,
only: @only,
delete_by_query_options: delete_by_query_options.merge(refresh: false)
)
end

private
Expand Down
43 changes: 38 additions & 5 deletions lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ module RakeHelper
output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}"
end

DELETE_BY_QUERY_OPTIONS = %w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze
FALSE_VALUES = %w[0 f false off].freeze

class << self
# Performs zero-downtime reindexing of all documents for the specified indexes
#
Expand Down Expand Up @@ -162,7 +165,7 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout)

subscribed_task_stats(output) do
output.puts "Applying journal entries created after #{time}"
count = Chewy::Journal.new(indexes_from(only: only, except: except)).apply(time)
count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time)
output.puts 'No journal entries were created after the specified time' if count.zero?
end
end
Expand All @@ -181,12 +184,16 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout)
# @param except [Array<Chewy::Index, String>, Chewy::Index, String] indexes to exclude from processing
# @param output [IO] output io for logging
# @return [Array<Chewy::Index>] indexes that were actually updated
def journal_clean(time: nil, only: nil, except: nil, output: $stdout)
def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout)
subscribed_task_stats(output) do
output.puts "Cleaning journal entries created before #{time}" if time
response = Chewy::Journal.new(indexes_from(only: only, except: except)).clean(time)
count = response['deleted'] || response['_indices']['_all']['deleted']
output.puts "Cleaned up #{count} journal entries"
response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options)
if response.key?('task')
output.puts "Task to cleanup the journal has been created, #{response['task']}"
else
count = response['deleted'] || response['_indices']['_all']['deleted']
kamtop marked this conversation as resolved.
Show resolved Hide resolved
output.puts "Cleaned up #{count} journal entries"
end
end
end

Expand Down Expand Up @@ -228,6 +235,26 @@ def update_mapping(name:, output: $stdout)
end
end

# Reads options that are required to run journal cleanup asynchronously from ENV hash
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
#
# @example
# Chewy::RakeHelper.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => 'false','REQUESTS_PER_SECOND' => '10','SCROLL_SIZE' => '5000'})
# # => { wait_for_completion: false, requests_per_second: 10.0, scroll_size: 5000 }
#
def delete_by_query_options_from_env(env)
env
.slice(*DELETE_BY_QUERY_OPTIONS)
.transform_keys { |k| k.downcase.to_sym }
.to_h do |key, value|
case key
when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)]
when :requests_per_second then [key, value.to_f]
when :scroll_size then [key, value.to_i]
end
end
end

def normalize_indexes(*identifiers)
identifiers.flatten(1).map { |identifier| normalize_index(identifier) }
end
Expand All @@ -248,6 +275,12 @@ def subscribed_task_stats(output = $stdout, &block)

private

def journal_indexes_from(only: nil, except: nil)
return if Array.wrap(only).empty? && Array.wrap(except).empty?

indexes_from(only: only, except: except)
end

def indexes_from(only: nil, except: nil)
indexes = if only.present?
normalize_indexes(Array.wrap(only))
Expand Down
18 changes: 15 additions & 3 deletions lib/chewy/search/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -962,10 +962,22 @@ def pluck(*fields)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
# @note The result hash is different for different API used.
# @param refresh [true, false] field names
# @param refresh [true, false] Refreshes all shards involved in the delete by query
# @param wait_for_completion [true, false] wait for request completion or run it asynchronously
# and return task reference at `.tasks/task/${taskId}`.
# @param requests_per_second [Float] The throttle for this request in sub-requests per second
# @param scroll_size [Integer] Size of the scroll request that powers the operation

# @return [Hash] the result of query execution
def delete_all(refresh: true)
request_body = only(WHERE_STORAGES).render.merge(refresh: refresh)
def delete_all(refresh: true, wait_for_completion: nil, requests_per_second: nil, scroll_size: nil)
request_body = only(WHERE_STORAGES).render.merge(
{
refresh: refresh,
wait_for_completion: wait_for_completion,
requests_per_second: requests_per_second,
scroll_size: scroll_size
}.compact
)
ActiveSupport::Notifications.instrument 'delete_query.chewy', notification_payload(request: request_body) do
request_body[:body] = {query: {match_all: {}}} if request_body[:body].empty?
Chewy.client.delete_by_query(request_body)
Expand Down
6 changes: 3 additions & 3 deletions lib/chewy/stash.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def self.entries(since_time, only: [])
# Cleans up all the journal entries until the specified time. If nothing is
# specified - cleans up everything.
#
# @param since_time [Time, DateTime] the time top boundary
# @param until_time [Time, DateTime] Clean everything before that date
# @param only [Chewy::Index, Array<Chewy::Index>] indexes to clean up journal entries for
def self.clean(until_time = nil, only: [])
def self.clean(until_time = nil, only: [], delete_by_query_options: {})
scope = self.for(only)
scope = scope.filter(range: {created_at: {lte: until_time}}) if until_time
scope.delete_all
scope.delete_all(**delete_by_query_options)
end

# Selects all the journal entries for the specified indices.
Expand Down
8 changes: 7 additions & 1 deletion lib/tasks/chewy.rake
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ namespace :chewy do

desc 'Removes journal records created before the specified timestamp for the specified indexes/types or all of them'
task clean: :environment do |_task, args|
Chewy::RakeHelper.journal_clean(**parse_journal_args(args.extras))
delete_options = Chewy::RakeHelper.delete_by_query_options_from_env(ENV)
Chewy::RakeHelper.journal_clean(
[
parse_journal_args(args.extras),
{delete_by_query_options: delete_options}
].reduce({}, :merge)
)
end
end
end
68 changes: 68 additions & 0 deletions spec/chewy/rake_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,33 @@
described_class.journal_clean(except: CitiesIndex, output: output)
expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE))
\\ACleaned up 1 journal entries
Total: \\d+s\\Z
OUTPUT
end

it 'executes asynchronously' do
output = StringIO.new
expect(Chewy.client).to receive(:delete_by_query).with(
{
body: {query: {match_all: {}}},
index: ['chewy_journal'],
refresh: false,
requests_per_second: 10.0,
scroll_size: 200,
wait_for_completion: false
}
).and_call_original
described_class.journal_clean(
output: output,
delete_by_query_options: {
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 200
}
)

expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE))
\\ATask to cleanup the journal has been created, [^\\n]*
Total: \\d+s\\Z
OUTPUT
end
Expand Down Expand Up @@ -502,4 +529,45 @@
end
end
end

describe '.delete_by_query_options_from_env' do
subject(:options) { described_class.delete_by_query_options_from_env(env) }
let(:env) do
{
'WAIT_FOR_COMPLETION' => 'false',
'REQUESTS_PER_SECOND' => '10',
kamtop marked this conversation as resolved.
Show resolved Hide resolved
'SCROLL_SIZE' => '5000'
}
end

it 'parses the options' do
expect(options).to eq(
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 5000
)
end

context 'with different boolean values' do
it 'parses the option correctly' do
%w[1 t true TRUE on ON].each do |v|
expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v}))
.to eq(wait_for_completion: true)
end

%w[0 f false FALSE off OFF].each do |v|
expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v}))
.to eq(wait_for_completion: false)
end
end
end

context 'with other env' do
let(:env) { {'SOME_ENV' => '123', 'REQUESTS_PER_SECOND' => '15'} }

it 'parses only the options' do
expect(options).to eq(requests_per_second: 15.0)
end
end
end
end
25 changes: 25 additions & 0 deletions spec/chewy/search/request_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,31 @@
request: {index: ['products'], body: {query: {match: {name: 'name3'}}}, refresh: false}
)
end

it 'delete records asynchronously' do
outer_payload = nil
ActiveSupport::Notifications.subscribe('delete_query.chewy') do |_name, _start, _finish, _id, payload|
outer_payload = payload
end
subject.query(match: {name: 'name3'}).delete_all(
refresh: false,
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 2000
)
expect(outer_payload).to eq(
index: ProductsIndex,
indexes: [ProductsIndex],
request: {
index: ['products'],
body: {query: {match: {name: 'name3'}}},
refresh: false,
wait_for_completion: false,
requests_per_second: 10.0,
scroll_size: 2000
}
)
end
end

describe '#response=' do
Expand Down