Output data to Elasticsearch
.
:::tip
Engine Supported
- supported
ElasticSearch version is >= 2.x and <= 8.x
:::
name | type | required | default value |
---|---|---|---|
hosts | array | yes | - |
index | string | yes | - |
schema_save_mode | string | yes | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | string | yes | APPEND_DATA |
index_type | string | no | |
primary_keys | list | no | |
key_delimiter | string | no | _ |
username | string | no | |
password | string | no | |
max_retry_count | int | no | 3 |
max_batch_size | int | no | 10 |
tls_verify_certificate | boolean | no | true |
tls_verify_hostnames | boolean | no | true |
tls_keystore_path | string | no | - |
tls_keystore_password | string | no | - |
tls_truststore_path | string | no | - |
tls_truststore_password | string | no | - |
common-options | no | - |
Elasticsearch
cluster http address, the format is host:port
, allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"]
.
Elasticsearch
index
name.Index support contains variables of field name,such as seatunnel_${age}
,and the field must appear at seatunnel row.
If not, we will treat it as a normal index.
Elasticsearch
index type, it is recommended not to specify in elasticsearch 6 and above
Primary key fields used to generate the document _id
, this is cdc required options.
Delimiter for composite keys ("_" by default), e.g., "$" would result in document _id
"KEY1$KEY2$KEY3".
x-pack username
x-pack password
one bulk request max try size
batch bulk doc max size
Enable certificates validation for HTTPS endpoints
Enable hostname validation for HTTPS endpoints
The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.
The key password for the key store specified
The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.
The key password for the trust store specified
Sink plugin common parameters, please refer to Sink Common Options for details
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
DROP_DATA: Preserve database structure and delete data
APPEND_DATA:Preserve database structure, preserve data
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported
Simple
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
}
}
CDC(Change data capture) event
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
# cdc required options
primary_keys = ["key1", "key2", ...]
}
}
SSL (Disable certificates validation)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
}
}
SSL (Disable hostname validation)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_hostname = false
}
}
SSL (Enable certificates validation)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}
SAVE_MODE (Add saveMode function)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
- Add Elasticsearch Sink Connector