- 基于 medcl/esm 重构的 Search(Elasticsearch/OpenSearch) 同步工具
- 因为要做集群灾备,需要在主备两个集群之间持续性地同步数据,调查过多个方案:
- CCR(Cross-cluster replication) 官方工具,可惜需要收费,无奈放弃 :-(
- esm 如官方文档说所,最大的特点快. 可惜也发现不少问题:
- 同步后发现数据不全,一般需要采用多次执行的方式来补全数据;
- 调查后发现,其采用的是获取源数据后,使用多个 goroutine 将数据通过
bulk
写入目标index, 会造成目标index重复写入很多相同数据的问题; - 更改源码,加入
--sync
功能,通过scroll
同时查询源和目标index,比较其内容的方式,实现增量更新(Add/Update/Del), 目前该 PR/84 已经合并到esm中。 - 目前 esm 的作者已经不再维护,因此后来发现的一些bug也很难改善及合并。
- 从 issue 和 源码来看, esm 不支持 OpenSearch, esm作者以后也不会再更改
- 对源码进行了更改,对同步(Sync)功能进行了加强,从而满足我在两个集群之间"近"实时同步的需求。
- 增加
--stamp
参数,如果 index 中有表示最后更新时间
的字段,可以进一步减少查询的数据量(本质是利用esm中的 --query 方式,单独提取出来更易于控制) - 增加对
OpenSearch
的支持 - 对已有代码进行了较大的重构,更改了一些bug
- 编写通用的集群同步脚本
search_sync.sh
, 可以按需同步集群中的index,也可以对每月自动生成的 index 进行处理. - 为了方便测试,使用 docker-compose 来设置不同版本,不同类型的 源/目标 集群.
- 增加
- 目前主要的更改集中在
--sync
上,其他功能测试的不多,有可能会出现问题.
Elasticsearch cross version data migration.
Links:
- Dec 3rd, 2020: [EN] Cross version Elasticsearch data migration with ESM
- Use INFINI Gateway to check the Document-Level differences between two clusters or indices after the migration
- Cross version migration supported
- Overwrite index name
- Copy index settings and mapping
- Support http basic auth
- Support dump index to local file
- Support loading index from local file
- Support http proxy
- Support sliced scroll ( elasticsearch 5.0 +)
- Support run in background
- Generate testing data by randomize the source document id
- Support rename filed name
- Support unify document type name
- Support specify which _source fields to return from source
- Support specify query string query to filter the data source
- Support rename source fields while do bulk indexing
- Support incremental update(add/update/delete changed records) with
--sync
. Notice: it use different implementation, just handle the changed records, but not as fast as the old way - Load generating with
A 3 nodes cluster(3 * c5d.4xlarge, 16C,32GB,10Gbps)
root@ip-172-31-13-181:/tmp# ./esm -s https://localhost:8000 -d https://localhost:8000 -x logs1kw -y logs122 -m elastic:medcl123 -n elastic:medcl123 -w 40 --sliced_scroll_size=60 -b 5 --buffer_count=2000000 --regenerate_id
[12-19 06:31:20] [INF] [main.go:506,main] start data migration..
Scroll 10064570 / 10064570 [=================================================] 100.00% 55s
Bulk 10062602 / 10064570 [==================================================] 99.98% 55s
[12-19 06:32:15] [INF] [main.go:537,main] data migration finished.
Migrated 10,000,000 documents within a minute, Nginx log generated from kibana_sample_data_logs.
Before running the esm, please manually prepare the target index with mapping and optimized settings to improve the speed, for example:
PUT your-new-index
{
"settings": {
"index.translog.durability": "async",
"refresh_interval": "-1",
"number_of_shards": 10,
"number_of_replicas": 0
}
}
copy index index_name
from 192.168.1.x
to 192.168.1.y:9200
./bin/esm -s http://192.168.1.x:9200 -d http://192.168.1.y:9200 -x index_name -w=5 -b=10 -c 10000
copy index src_index
from 192.168.1.x
to 192.168.1.y:9200
and save with dest_index
./bin/esm -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100
use sync feature for incremental update index src_index
from 192.168.1.x
to 192.168.1.y:9200
./bin/esm --sync -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index
support Basic-Auth
./bin/esm -s http://localhost:9200 -x "src_index" -y "dest_index" -d http://localhost:9201 -n admin:111111
copy settings and override shard size
./bin/esm -s http://localhost:9200 -x "src_index" -y "dest_index" -d http://localhost:9201 -m admin:111111 -c 10000 --shards=50 --copy_settings
copy settings and mapping, recreate target index, add query to source fetch, refresh after migration
./bin/esm -s http://localhost:9200 -x "src_index" -q=query:phone -y "dest_index" -d http://localhost:9201 -c 10000 --shards=5 --copy_settings --copy_mappings --force --refresh
dump elasticsearch documents into local file
./bin/esm -s http://localhost:9200 -x "src_index" -m admin:111111 -c 5000 -q=query:mixer --refresh -o=dump.bin
dump source and target index to local file and compare them, so can find the difference quickly
./bin/esm --sort=_id -s http://localhost:9200 -x "src_index" --truncate_output --skip=_index -o=src.json
./bin/esm --sort=_id -s http://localhost:9200 -x "dst_index" --truncate_output --skip=_index -o=dst.json
diff -W 200 -ry --suppress-common-lines src.json dst.json
loading data from dump files, bulk insert to another es instance
./bin/esm -d http://localhost:9200 -y "dest_index" -n admin:111111 -c 5000 -b 5 --refresh -i=dump.bin
support proxy
./bin/esm -d http://123345.ap-northeast-1.aws.found.io:9200 -y "dest_index" -n admin:111111 -c 5000 -b 1 --refresh -i dump.bin --dest_proxy=http://127.0.0.1:9743
use sliced scroll(only available in elasticsearch v5) to speed scroll, and update shard number
./bin/esm -s=http://192.168.3.206:9200 -d=http://localhost:9200 -n=elastic:changeme -f --copy_settings --copy_mappings -x=bestbuykaggle --sliced_scroll_size=5 --shards=50 --refresh
migrate 5.x to 6.x and unify all the types to doc
./esm -s http://source_es:9200 -x "source_index*" -u "doc" -w 10 -b 10 - -t "10m" -d https://target_es:9200 -m elastic:passwd -n elastic:passwd -c 5000
to migrate version 7.x and you may need to rename _type
to _doc
./esm -s http://localhost:9201 -x "source" -y "target" -d https://localhost:9200 --rename="_type:type,age:myage" -u"_doc"
filter migration with range query
./esm -s https://192.168.3.98:9200 -m elastic:password -o json.out -x kibana_sample_data_ecommerce -q "order_date:[2020-02-01T21:59:02+00:00 TO 2020-03-01T21:59:02+00:00]"
range query, keyword type and escape
./esm -s https://192.168.3.98:9200 -m test:123 -o 1.txt -x test1 -q "@timestamp.keyword:[\"2021-01-17 03:41:20\" TO \"2021-03-17 03:41:20\"]"
generate testing data, if input.json
contains 10 documents, the follow command will ingest 100 documents, good for testing
./bin/esm -i input.json -d http://localhost:9201 -y target-index1 --regenerate_id --repeat_times=10
select source fields
./bin/esm -s http://localhost:9201 -x my_index -o dump.json --fields=author,title
rename fields while do bulk indexing
./bin/esm -i dump.json -d http://localhost:9201 -y target-index41 --rename=title:newtitle
user buffer_count to control memory used by ESM, and use gzip to compress network traffic
./esm -s https://localhost:8000 -d https://localhost:8000 -x logs1kw -y logs122 -m elastic:medcl123 -n elastic:medcl123 --regenerate_id -w 20 --sliced_scroll_size=60 -b 5 --buffer_count=1000000 --compress false
https://github.com/medcl/esm/releases
if download version is not fill you environment,you may try to compile it yourself. go
required.
make build
- go version >= 1.7
Usage:
esm [OPTIONS]
Application Options:
-s, --source= source elasticsearch instance, ie: http://localhost:9200
-q, --query= query against source elasticsearch instance, filter data before migrate, ie: name:medcl
--sort= sort field when scroll, ie: _id (default: _id)
-d, --dest= destination elasticsearch instance, ie: http://localhost:9201
-m, --source_auth= basic auth of source elasticsearch instance, ie: user:pass
-n, --dest_auth= basic auth of target elasticsearch instance, ie: user:pass
-c, --count= number of documents at a time: ie "size" in the scroll request (10000)
--buffer_count= number of buffered documents in memory (100000)
-w, --workers= concurrency number for bulk workers (1)
-b, --bulk_size= bulk size in MB (5)
-t, --time= scroll time (1m)
--sliced_scroll_size= size of sliced scroll, to make it work, the size should be > 1 (1)
-f, --force delete destination index before copying
-a, --all copy indexes starting with . and _
--copy_settings copy index settings from source
--copy_mappings copy index mappings from source
--shards= set a number of shards on newly created indexes
-x, --src_indexes= indexes name to copy,support regex and comma separated list (_all)
-y, --dest_index= indexes name to save, allow only one indexname, original indexname will be used if not specified
-u, --type_override= override type name
--green wait for both hosts cluster status to be green before dump. otherwise yellow is okay
-v, --log= setting log level,options:trace,debug,info,warn,error (INFO)
-o, --output_file= output documents of source index into local file
--truncate_output= truncate before dump to output file
-i, --input_file= indexing from local dump file
--input_file_type= the data type of input file, options: dump, json_line, json_array, log_line (dump)
--source_proxy= set proxy to source http connections, ie: http://127.0.0.1:8080
--dest_proxy= set proxy to target http connections, ie: http://127.0.0.1:8080
--refresh refresh after migration finished
--sync= sync will use scroll for both source and target index, compare the data and sync(index/update/delete)
--fields= filter source fields(white list), comma separated, ie: col1,col2,col3,...
--skip= skip source fields(black list), comma separated, ie: col1,col2,col3,...
--rename= rename source fields, comma separated, ie: _type:type, name:myname
-l, --logstash_endpoint= target logstash tcp endpoint, ie: 127.0.0.1:5055
--secured_logstash_endpoint target logstash tcp endpoint was secured by TLS
--repeat_times= repeat the data from source N times to dest output, use align with parameter regenerate_id to amplify the data size
-r, --regenerate_id regenerate id for documents, this will override the exist document id in data source
--compress use gzip to compress traffic
-p, --sleep= sleep N seconds after finished a bulk request (-1)
Help Options:
-h, --help Show this help message
- Scroll ID too long, update
elasticsearch.yml
on source cluster.
http.max_header_size: 16k
http.max_initial_line_length: 8k
From | To |
---|---|
1.x | 1.x |
1.x | 2.x |
1.x | 5.x |
1.x | 6.x |
1.x | 7.x |
2.x | 1.x |
2.x | 2.x |
2.x | 5.x |
2.x | 6.x |
2.x | 7.x |
5.x | 1.x |
5.x | 2.x |
5.x | 5.x |
5.x | 6.x |
5.x | 7.x |
6.x | 1.x |
6.x | 2.x |
6.x | 5.0 |
6.x | 6.x |
6.x | 7.x |
7.x | 1.x |
7.x | 2.x |
7.x | 5.x |
7.x | 6.x |
7.x | 7.x |