Skip to content

Commit 448f027

Browse files
Merge pull request danielberkompas#27 from infinitered/26-bulk-configs
[danielberkompas#26] Configure bulk settings on indexes
2 parents b2d2e73 + 559efa6 commit 448f027

File tree

16 files changed

+393
-95
lines changed

16 files changed

+393
-95
lines changed

README.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,6 @@ config :my_app, MyApp.ElasticsearchCluster,
5959
username: "username",
6060
password: "password",
6161

62-
# When indexing data using the `mix elasticsearch.build` task,
63-
# control the data ingestion rate by raising or lowering the number
64-
# of items to send in each bulk request.
65-
bulk_page_size: 5000,
66-
67-
# Likewise, wait a given period between posting pages to give
68-
# Elasticsearch time to catch up.
69-
bulk_wait_interval: 15_000, # 15 seconds
70-
7162
# If you want to mock the responses of the Elasticsearch JSON API
7263
# for testing or other purposes, you can inject a different module
7364
# here. It must implement the Elasticsearch.API behaviour.
@@ -100,7 +91,16 @@ config :my_app, MyApp.ElasticsearchCluster,
10091
#
10192
# Each piece of data that is returned by the store must implement the
10293
# Elasticsearch.Document protocol.
103-
sources: [MyApp.Post]
94+
sources: [MyApp.Post],
95+
96+
# When indexing data using the `mix elasticsearch.build` task,
97+
# control the data ingestion rate by raising or lowering the number
98+
# of items to send in each bulk request.
99+
bulk_page_size: 5000,
100+
101+
# Likewise, wait a given period between posting pages to give
102+
# Elasticsearch time to catch up.
103+
bulk_wait_interval: 15_000 # 15 seconds
104104
}
105105
}
106106
```

config/config.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ config :elasticsearch, Elasticsearch.Test.Repo,
1313

1414
config :elasticsearch, ecto_repos: [Elasticsearch.Test.Repo]
1515

16-
config :logger, level: :warn
16+
config :logger, level: :debug

guides/upgrading/0.3.x_to_0.4.x.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Upgrading from 0.3.x to 0.4.x
2+
3+
Version `0.4.0` moved the `:bulk_page_size` and `:bulk_wait_interval` settings
4+
into each index's configuration, rather than global on the cluster.
5+
6+
## Rationale
7+
8+
This makes it easier to have different bulk settings for different indexes.
9+
10+
## Changes
11+
12+
**BREAKING**: `:bulk_page_size` and `:bulk_wait_interval` are now configured
13+
on the index, not the cluster.
14+
15+
**FEATURE**: You can now pass `--bulk-wait-interval` and `--bulk-page-size`
16+
options to `mix elasticsearch.build`, as requested in [#26](https://github.com/infinitered/elasticsearch-elixir/issues/26).
17+
18+
## How to Update Your App
19+
20+
Move the `:bulk_page_size` and `:bulk_wait_interval` settings to your index
21+
rather than your entire cluster.
22+
23+
# BEFORE
24+
config :my_app, MyApp.ElasticsearchCluster,
25+
api: Elasticsearch.API.HTTP,
26+
json_library: Poison,
27+
url: "http://localhost:9200",
28+
username: "username",
29+
password: "password",
30+
bulk_page_size: 5000,
31+
bulk_wait_interval: 0,
32+
indexes: %{
33+
posts: %{
34+
settings: "test/support/settings/posts.json",
35+
store: Elasticsearch.Test.Store,
36+
sources: [Post]
37+
}
38+
}
39+
40+
# AFTER
41+
config :my_app, MyApp.ElasticsearchCluster,
42+
api: Elasticsearch.API.HTTP,
43+
json_library: Poison,
44+
url: "http://localhost:9200",
45+
username: "username",
46+
password: "password",
47+
indexes: %{
48+
posts: %{
49+
settings: "test/support/settings/posts.json",
50+
store: Elasticsearch.Test.Store,
51+
sources: [Post],
52+
bulk_page_size: 5000,
53+
bulk_wait_interval: 0
54+
}
55+
}

lib/elasticsearch/cluster/config.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ defmodule Elasticsearch.Cluster.Config do
3434
username: [presence: [unless: &(&1[:password] == nil)]],
3535
password: [presence: [unless: &(&1[:username] == nil)]],
3636
api: [presence: true, by: &is_module/1],
37-
json_library: [by: &(is_nil(&1) || is_module(&1))],
38-
bulk_page_size: [presence: true, by: &is_integer/1],
39-
bulk_wait_interval: [presence: true, by: &is_integer/1]
37+
json_library: [by: &(is_nil(&1) || is_module(&1))]
4038
),
4139
:ok <- validate_indexes(config[:indexes] || %{}) do
4240
{:ok, config}
@@ -80,7 +78,9 @@ defmodule Elasticsearch.Cluster.Config do
8078
sources: [
8179
presence: true,
8280
by: &(is_list(&1) && Enum.map(&1, fn source -> is_atom(source) end))
83-
]
81+
],
82+
bulk_page_size: [presence: true, by: &is_integer/1],
83+
bulk_wait_interval: [presence: true, by: &is_integer/1]
8484
)
8585
end
8686
end

lib/elasticsearch/indexing/bulk.ex

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,35 @@ defmodule Elasticsearch.Index.Bulk do
7979
"""
8080
@spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) ::
8181
:ok | {:error, [map]}
82-
def upload(cluster, index_name, store, sources, errors \\ [])
83-
def upload(_cluster, _index_name, _store, [], []), do: :ok
84-
def upload(_cluster, _index_name, _store, [], errors), do: {:error, errors}
85-
86-
def upload(cluster, index_name, store, [source | tail] = _sources, errors)
82+
def upload(cluster, index_name, index_config, errors \\ [])
83+
def upload(_cluster, _index_name, %{sources: []}, []), do: :ok
84+
def upload(_cluster, _index_name, %{sources: []}, errors), do: {:error, errors}
85+
86+
def upload(
87+
cluster,
88+
index_name,
89+
%{store: store, sources: [source | tail]} = index_config,
90+
errors
91+
)
8792
when is_atom(store) do
8893
config = Cluster.Config.get(cluster)
94+
bulk_page_size = index_config[:bulk_page_size] || 5000
95+
bulk_wait_interval = index_config[:bulk_wait_interval] || 0
8996

9097
errors =
91-
config
92-
|> DataStream.stream(source, store)
98+
source
99+
|> DataStream.stream(store, bulk_page_size)
93100
|> Stream.map(&encode!(config, &1, index_name))
94-
|> Stream.chunk_every(config.bulk_page_size)
95-
|> Stream.intersperse(config.bulk_wait_interval)
101+
|> Stream.chunk_every(bulk_page_size)
102+
|> Stream.intersperse(bulk_wait_interval)
96103
|> Stream.map(&put_bulk_page(config, index_name, &1))
97104
|> Enum.reduce(errors, &collect_errors/2)
98105

99-
upload(cluster, index_name, store, tail, errors)
106+
upload(config, index_name, %{index_config | sources: tail}, errors)
100107
end
101108

102109
defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do
103-
IO.puts("Pausing #{wait_interval}ms between bulk pages")
110+
Logger.debug("Pausing #{wait_interval}ms between bulk pages")
104111
:timer.sleep(wait_interval)
105112
end
106113

lib/elasticsearch/indexing/index.ex

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,20 @@ defmodule Elasticsearch.Index do
2323
2424
iex> file = "test/support/settings/posts.json"
2525
...> store = Elasticsearch.Test.Store
26-
...> Index.hot_swap(Cluster, "posts", file, store, [Post])
26+
...> Index.hot_swap(Cluster, "posts", %{settings: file, store: store, sources: [Post]})
2727
:ok
2828
"""
29-
@spec hot_swap(
30-
Cluster.t(),
31-
alias :: String.t() | atom,
32-
settings_path :: String.t(),
33-
Elasticsearch.Store.t(),
34-
list
35-
) ::
36-
:ok
37-
| {:error, Elasticsearch.Exception.t()}
38-
def hot_swap(cluster, alias, settings_file, store, sources) do
29+
@spec hot_swap(Cluster.t(), alias :: String.t() | atom, %{
30+
settings: Path.t(),
31+
store: module,
32+
sources: [any]
33+
}) :: :ok | {:error, Elasticsearch.Exception.t()}
34+
def hot_swap(cluster, alias, %{settings: settings_file} = index_config) do
3935
name = build_name(alias)
4036
config = Config.get(cluster)
4137

4238
with :ok <- create_from_file(config, name, settings_file),
43-
:ok <- Bulk.upload(config, name, store, sources),
39+
:ok <- Bulk.upload(config, name, index_config),
4440
:ok <- __MODULE__.alias(config, name, alias),
4541
:ok <- clean_starting_with(config, alias, 2),
4642
:ok <- refresh(config, name) do

lib/elasticsearch/storage/data_stream.ex

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,21 @@ defmodule Elasticsearch.DataStream do
1313
1414
## Example
1515
16-
iex> stream = DataStream.stream(Cluster, MyApp.Schema, Elasticsearch.Test.Store)
16+
iex> stream = DataStream.stream(MyApp.Schema, Elasticsearch.Test.Store, 100)
1717
...> is_function(stream)
1818
true
1919
2020
"""
2121
@spec stream(Cluster.t(), source, Elasticsearch.Store.t()) :: Stream.t()
22-
def stream(cluster, source, store) do
23-
config = Cluster.Config.get(cluster)
24-
Stream.resource(fn -> init(config) end, &next(&1, source, store), &finish/1)
22+
def stream(source, store, bulk_page_size) do
23+
Stream.resource(fn -> init(bulk_page_size) end, &next(&1, source, store), &finish/1)
2524
end
2625

2726
# Store state in the following format:
2827
#
2928
# {items, offset, limit}
30-
defp init(config) do
31-
{[], 0, config.bulk_page_size}
29+
defp init(bulk_page_size) do
30+
{[], 0, bulk_page_size}
3231
end
3332

3433
# If no items, load another page of items

lib/mix/elasticsearch.build.ex

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ defmodule Mix.Tasks.Elasticsearch.Build do
1111
For a functional version of this approach, see
1212
`Elasticsearch.Index.hot_swap/4`.
1313
14+
## Options
15+
16+
`--cluster`: The `Elasticsearch.Cluster` to build the indexes to.
17+
18+
`--bulk-page-size`: (Optional) The number of documents to post to the cluster in each
19+
bulk page upload. Default: 5000
20+
21+
`--bulk-wait-interval`: (Optional) The number of milliseconds to wait between posting
22+
each bulk page, to avoid overloading your cluster. Default: 0
23+
1424
## Example
1525
1626
$ mix elasticsearch.build posts [index2] [index3] --cluster MyApp.Cluster
@@ -19,10 +29,16 @@ defmodule Mix.Tasks.Elasticsearch.Build do
1929
2030
$ mix elasticsearch.build posts --existing --cluster MyApp.Cluster
2131
Index posts already exists.
32+
33+
You can also specify `--bulk-page-size` and `--bulk-wait-interval` manually:
34+
35+
$ mix elasticsearch.build posts --cluster MyApp.Cluster --bulk-page-size 1000 --bulk-wait-interval 500
2236
"""
2337

2438
require Logger
2539

40+
import Maybe
41+
2642
alias Elasticsearch.{
2743
Cluster.Config,
2844
Index
@@ -32,31 +48,33 @@ defmodule Mix.Tasks.Elasticsearch.Build do
3248
def run(args) do
3349
Mix.Task.run("app.start", [])
3450

35-
{cluster, indexes, type} = parse_args!(args)
51+
{type, cluster, indexes, settings} = parse_args!(args)
3652
config = Config.get(cluster)
3753

3854
for alias <- indexes do
39-
build(config, alias, type)
55+
build(type, config, alias, settings)
4056
end
4157
end
4258

43-
defp build(config, alias, :existing) do
59+
defp build(:existing, config, alias, settings) do
4460
case Index.latest_starting_with(config, alias) do
4561
{:ok, name} ->
4662
IO.puts("Index already exists: #{name}")
4763

4864
{:error, :not_found} ->
49-
build(config, alias, :rebuild)
65+
build(:rebuild, config, alias, settings)
5066

5167
{:error, exception} ->
52-
Mix.raise(exception)
68+
Mix.raise("""
69+
Index could not be built.
70+
71+
#{inspect(exception)}
72+
""")
5373
end
5474
end
5575

56-
defp build(config, alias, :rebuild) do
57-
%{settings: settings, store: store, sources: sources} = config.indexes[alias]
58-
59-
with :ok <- Index.hot_swap(config, alias, settings, store, sources) do
76+
defp build(:rebuild, config, alias, settings) do
77+
with :ok <- Index.hot_swap(config, alias, Map.merge(config.indexes[alias], settings)) do
6078
:ok
6179
else
6280
{:error, errors} when is_list(errors) ->
@@ -66,12 +84,12 @@ defmodule Mix.Tasks.Elasticsearch.Build do
6684
Index created, but not aliased: #{alias}
6785
The following errors occurred:
6886
69-
#{errors}
87+
#{errors}
7088
""")
7189

7290
{:error, :enoent} ->
7391
Mix.raise("""
74-
Schema file not found at #{settings}.
92+
Settings file not found at #{maybe(config, [:indexes, alias, :settings])}.
7593
""")
7694

7795
{:error, exception} ->
@@ -80,14 +98,20 @@ defmodule Mix.Tasks.Elasticsearch.Build do
8098
8199
#{inspect(exception)}
82100
""")
83-
84-
error ->
85-
Mix.raise(error)
86101
end
87102
end
88103

89104
defp parse_args!(args) do
90-
{options, indexes} = OptionParser.parse!(args, strict: [cluster: :string, existing: :boolean])
105+
{options, indexes} =
106+
OptionParser.parse!(
107+
args,
108+
strict: [
109+
cluster: :string,
110+
existing: :boolean,
111+
bulk_page_size: :integer,
112+
bulk_wait_interval: :integer
113+
]
114+
)
91115

92116
cluster =
93117
if options[:cluster] do
@@ -108,7 +132,12 @@ defmodule Mix.Tasks.Elasticsearch.Build do
108132

109133
type = if options[:existing], do: :existing, else: :rebuild
110134

111-
{cluster, indexes, type}
135+
settings =
136+
options
137+
|> Keyword.take([:bulk_page_size, :bulk_wait_interval])
138+
|> Enum.into(%{})
139+
140+
{type, cluster, indexes, settings}
112141
end
113142

114143
defp validate_indexes!(indexes, cluster) do

mix.exs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Elasticsearch.Mixfile do
66
app: :elasticsearch,
77
description: "Elasticsearch without DSLs",
88
source_url: "https://github.com/infinitered/elasticsearch-elixir",
9-
version: "0.3.1",
9+
version: "0.4.0",
1010
elixir: "~> 1.5",
1111
start_permanent: Mix.env() == :prod,
1212
elixirc_paths: elixirc_paths(Mix.env()),
@@ -61,6 +61,7 @@ defmodule Elasticsearch.Mixfile do
6161
{:poison, ">= 0.0.0", optional: true},
6262
{:httpoison, ">= 0.0.0"},
6363
{:vex, "~> 0.6.0"},
64+
{:maybe, "~> 1.0.0"},
6465
{:postgrex, ">= 0.0.0", only: [:dev, :test]},
6566
{:ex_doc, ">= 0.0.0", only: [:dev, :test]},
6667
{:ecto, ">= 0.0.0", only: [:dev, :test]},
@@ -73,8 +74,9 @@ defmodule Elasticsearch.Mixfile do
7374
main: "readme",
7475
extras: [
7576
"README.md",
76-
"guides/upgrading/0.1.x_to_0.2.x.md": [title: "0.1.x to 0.2.x"],
77-
"guides/upgrading/0.2.x_to_0.3.x.md": [title: "0.2.x to 0.3.x"]
77+
"guides/upgrading/0.3.x_to_0.4.x.md": [title: "0.3.x to 0.4.x"],
78+
"guides/upgrading/0.2.x_to_0.3.x.md": [title: "0.2.x to 0.3.x"],
79+
"guides/upgrading/0.1.x_to_0.2.x.md": [title: "0.1.x to 0.2.x"]
7880
],
7981
extra_section: "GUIDES",
8082
groups_for_extras: [

mix.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
1515
"jason": {:hex, :jason, "1.0.0-rc.1", "c8421d4e6e6ef0dd7c2b64ff63589f8561116808fa003dddfd5360cde7bb4625", [], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
1616
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [], [], "hexpm"},
17+
"maybe": {:hex, :maybe, "1.0.0", "65311dd7e16659579116666b268d03d7e1d1b3da8776c81a6b199de7177b43d6", [:mix], [], "hexpm"},
1718
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"},
1819
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"},
1920
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},

0 commit comments

Comments
 (0)