diff --git a/src/sinks/axiom.rs b/src/sinks/axiom.rs index 830398997c875..fdbb654309538 100644 --- a/src/sinks/axiom.rs +++ b/src/sinks/axiom.rs @@ -9,7 +9,7 @@ use crate::{ SinkContext, }, sinks::{ - elasticsearch::{ElasticsearchAuth, ElasticsearchConfig}, + elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig}, util::{http::RequestConfig, Compression}, Healthcheck, VectorSink, }, @@ -100,6 +100,7 @@ impl SinkConfig for AxiomConfig { query: Some(query), tls: self.tls.clone(), request, + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 53ce6fe504698..ac660751bec40 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -2,12 +2,16 @@ use std::collections::HashMap; use aws_types::credentials::SharedCredentialsProvider; use aws_types::region::Region; -use bytes::Bytes; -use http::{StatusCode, Uri}; +use bytes::{Buf, Bytes}; +use http::{Response, StatusCode, Uri}; +use hyper::{body, Body}; +use serde::Deserialize; use snafu::ResultExt; +use vector_core::config::proxy::ProxyConfig; use super::{ - request_builder::ElasticsearchRequestBuilder, ElasticsearchEncoder, InvalidHostSnafu, Request, + request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder, + InvalidHostSnafu, Request, }; use crate::{ http::{Auth, HttpClient, MaybeAuth}, @@ -38,7 +42,12 @@ pub struct ElasticsearchCommon { } impl ElasticsearchCommon { - pub async fn parse_config(config: &ElasticsearchConfig, endpoint: &str) -> crate::Result { + pub async fn parse_config( + config: &ElasticsearchConfig, + endpoint: &str, + proxy_config: &ProxyConfig, + version: &mut Option, + ) -> crate::Result { // Test the configured host, but ignore the result let uri = format!("{}/_test", endpoint); let uri = uri @@ -78,16 +87,6 @@ impl ElasticsearchCommon { let mode = config.common_mode()?; - let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into()); - let request_builder = ElasticsearchRequestBuilder { - compression: config.compression, - encoder: ElasticsearchEncoder { - transformer: config.encoding.clone(), - doc_type, - suppress_type_name: config.suppress_type_name, - }, - }; - let tower_request = config .request .tower @@ -103,11 +102,13 @@ impl ElasticsearchCommon { query_params.insert("pipeline".into(), pipeline.into()); } - let mut query = url::form_urlencoded::Serializer::new(String::new()); - for (p, v) in &query_params { - query.append_pair(&p[..], &v[..]); - } - let bulk_url = format!("{}/_bulk?{}", base_url, query.finish()); + let bulk_url = { + let mut query = url::form_urlencoded::Serializer::new(String::new()); + for (p, v) in &query_params { + query.append_pair(&p[..], &v[..]); + } + format!("{}/_bulk?{}", base_url, query.finish()) + }; let bulk_uri = bulk_url.parse::().unwrap(); let tls_settings = TlsSettings::from_options(&config.tls)?; @@ -122,6 +123,60 @@ impl ElasticsearchCommon { let region = config.aws.as_ref().and_then(|config| config.region()); + let version = if let Some(version) = *version { + version + } else { + let ver = match config.api_version { + ElasticsearchApiVersion::V6 => 6, + ElasticsearchApiVersion::V7 => 7, + ElasticsearchApiVersion::V8 => 8, + ElasticsearchApiVersion::Auto => { + match get_version( + &base_url, + &http_auth, + &aws_auth, + ®ion, + &request, + &tls_settings, + proxy_config, + ) + .await + { + Ok(version) => version, + // This error should be fatal, but for now we only emit it as a warning + // to make the transition smoother. + Err(error) => { + // For now, estimate version. + let assumed_version = match config.suppress_type_name { + Some(true) => 8, + _ => 6, + }; + warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.",%assumed_version, %error); + assumed_version + } + } + } + }; + *version = Some(ver); + ver + }; + + let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into()); + let suppress_type_name = if let Some(suppress_type_name) = config.suppress_type_name { + warn!(message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead."); + suppress_type_name + } else { + version >= 7 + }; + let request_builder = ElasticsearchRequestBuilder { + compression: config.compression, + encoder: ElasticsearchEncoder { + transformer: config.encoding.clone(), + doc_type, + suppress_type_name, + }, + }; + Ok(Self { http_auth, base_url, @@ -138,11 +193,17 @@ impl ElasticsearchCommon { } /// Parses endpoints into a vector of ElasticsearchCommons. The resulting vector is guaranteed to not be empty. - pub async fn parse_many(config: &ElasticsearchConfig) -> crate::Result> { + pub async fn parse_many( + config: &ElasticsearchConfig, + proxy_config: &ProxyConfig, + ) -> crate::Result> { + let mut version = None; if let Some(endpoint) = config.endpoint.as_ref() { warn!(message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead."); if config.endpoints.is_empty() { - Ok(vec![Self::parse_config(config, endpoint).await?]) + Ok(vec![ + Self::parse_config(config, endpoint, proxy_config, &mut version).await?, + ]) } else { Err(ParseError::EndpointsExclusive.into()) } @@ -151,7 +212,8 @@ impl ElasticsearchCommon { } else { let mut commons = Vec::new(); for endpoint in config.endpoints.iter() { - commons.push(Self::parse_config(config, endpoint).await?); + commons + .push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?); } Ok(commons) } @@ -160,30 +222,25 @@ impl ElasticsearchCommon { /// Parses a single endpoint, else panics. #[cfg(test)] pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result { - let mut commons = Self::parse_many(config).await?; - assert!(commons.len() == 1); + let mut commons = + Self::parse_many(config, crate::config::SinkContext::new_test().proxy()).await?; + assert_eq!(commons.len(), 1); Ok(commons.remove(0)) } pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> { - let mut builder = Request::get(format!("{}/_cluster/health", self.base_url)); - - if let Some(authorization) = &self.http_auth { - builder = authorization.apply_builder(builder); - } - - for (header, value) in &self.request.headers { - builder = builder.header(&header[..], &value[..]); - } - - let mut request = builder.body(Bytes::new())?; - - if let Some(credentials_provider) = &self.aws_auth { - sign_request(&mut request, credentials_provider, &self.region).await?; - } - let response = client.send(request.map(hyper::Body::from)).await?; - - match response.status() { + match get( + &self.base_url, + &self.http_auth, + &self.aws_auth, + &self.region, + &self.request, + client, + "/_cluster/health", + ) + .await? + .status() + { StatusCode::OK => Ok(()), status => Err(HealthcheckError::UnexpectedStatus { status }.into()), } @@ -197,3 +254,67 @@ pub async fn sign_request( ) -> crate::Result<()> { crate::aws::sign_request("es", request, credentials_provider, region).await } + +async fn get_version( + base_url: &str, + http_auth: &Option, + aws_auth: &Option, + region: &Option, + request: &RequestConfig, + tls_settings: &TlsSettings, + proxy_config: &ProxyConfig, +) -> crate::Result { + #[derive(Deserialize)] + struct ClusterState { + version: Option, + } + + let client = HttpClient::new(tls_settings.clone(), proxy_config)?; + let response = get( + base_url, + http_auth, + aws_auth, + region, + request, + client, + "/_cluster/state/version", + ) + .await + .map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?; + + let (_, body) = response.into_parts(); + let mut body = body::aggregate(body).await?; + let body = body.copy_to_bytes(body.remaining()); + let ClusterState { version } = serde_json::from_slice(&body)?; + version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into()) +} + +async fn get( + base_url: &str, + http_auth: &Option, + aws_auth: &Option, + region: &Option, + request: &RequestConfig, + client: HttpClient, + path: &str, +) -> crate::Result> { + let mut builder = Request::get(format!("{}{}", base_url, path)); + + if let Some(authorization) = &http_auth { + builder = authorization.apply_builder(builder); + } + + for (header, value) in &request.headers { + builder = builder.header(&header[..], &value[..]); + } + + let mut request = builder.body(Bytes::new())?; + + if let Some(credentials_provider) = aws_auth { + sign_request(&mut request, credentials_provider, region).await?; + } + client + .send(request.map(hyper::Body::from)) + .await + .map_err(Into::into) +} diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index a1955fdb6c64a..d411eb088a6b3 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -20,8 +20,8 @@ use crate::{ retry::ElasticsearchRetryLogic, service::{ElasticsearchService, HttpRequestBuilder}, sink::ElasticsearchSink, - BatchActionTemplateSnafu, ElasticsearchAuth, ElasticsearchCommon, - ElasticsearchCommonMode, ElasticsearchMode, IndexTemplateSnafu, + BatchActionTemplateSnafu, ElasticsearchApiVersion, ElasticsearchAuth, + ElasticsearchCommon, ElasticsearchCommonMode, ElasticsearchMode, IndexTemplateSnafu, }, util::{ http::RequestConfig, service::HealthConfig, BatchConfig, Compression, @@ -61,13 +61,19 @@ pub struct ElasticsearchConfig { /// set this option since Elasticsearch has removed it. pub doc_type: Option, + /// The API version of Elasticsearch. + #[serde(default)] + pub api_version: ElasticsearchApiVersion, + /// Whether or not to send the `type` field to Elasticsearch. /// /// `type` field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x. /// /// If enabled, the `doc_type` option will be ignored. - #[serde(default)] - pub suppress_type_name: bool, + /// + /// This option has been deprecated, the `api_version` option should be used instead. + #[configurable(deprecated)] + pub suppress_type_name: Option, /// Whether or not to retry successful requests containing partial failures. /// @@ -381,7 +387,7 @@ impl DataStreamConfig { #[async_trait::async_trait] impl SinkConfig for ElasticsearchConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let commons = ElasticsearchCommon::parse_many(self).await?; + let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?; let common = commons[0].clone(); let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?; @@ -491,4 +497,28 @@ mod tests { ) .unwrap(); } + + #[test] + fn parse_version() { + let config = toml::from_str::( + r#" + endpoints = [""] + api_version = "v7" + "#, + ) + .unwrap(); + assert_eq!(config.api_version, ElasticsearchApiVersion::V7); + } + + #[test] + fn parse_version_auto() { + let config = toml::from_str::( + r#" + endpoints = [""] + api_version = "auto" + "#, + ) + .unwrap(); + assert_eq!(config.api_version, ElasticsearchApiVersion::Auto); + } } diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index b8fc21f8f04f0..a291b3e90e899 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -110,7 +110,7 @@ async fn ensure_pipeline_in_params() { let pipeline = String::from("test-pipeline"); let config = ElasticsearchConfig { - endpoints: vec!["http://localhost:9200".to_string()], + endpoints: vec![http_server()], bulk: Some(BulkConfig { index: Some(index), action: None, @@ -205,6 +205,65 @@ async fn structures_events_correctly() { assert_eq!(&expected, value); } +#[tokio::test] +async fn auto_version_http() { + trace_init(); + + let config = ElasticsearchConfig { + endpoints: vec![http_server()], + doc_type: Some("log_lines".into()), + compression: Compression::None, + api_version: ElasticsearchApiVersion::Auto, + ..config() + }; + let _ = ElasticsearchCommon::parse_single(&config) + .await + .expect("Config error"); +} + +#[tokio::test] +async fn auto_version_https() { + trace_init(); + + let config = ElasticsearchConfig { + auth: Some(ElasticsearchAuth::Basic { + user: "elastic".to_string(), + password: "vector".to_string().into(), + }), + endpoints: vec![https_server()], + doc_type: Some("log_lines".into()), + compression: Compression::None, + tls: Some(TlsConfig { + ca_file: Some(tls::TEST_PEM_CA_PATH.into()), + ..Default::default() + }), + api_version: ElasticsearchApiVersion::Auto, + ..config() + }; + let _ = ElasticsearchCommon::parse_single(&config) + .await + .expect("Config error"); +} + +#[tokio::test] +async fn auto_version_aws() { + trace_init(); + + let config = ElasticsearchConfig { + auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default { + load_timeout_secs: Some(5), + })), + endpoints: vec![aws_server()], + aws: Some(RegionOrEndpoint::with_region(String::from("localstack"))), + api_version: ElasticsearchApiVersion::Auto, + ..config() + }; + + let _ = ElasticsearchCommon::parse_single(&config) + .await + .expect("Config error"); +} + #[tokio::test] async fn insert_events_over_http() { trace_init(); @@ -258,6 +317,7 @@ async fn insert_events_on_aws() { })), endpoints: vec![aws_server()], aws: Some(RegionOrEndpoint::with_region(String::from("localstack"))), + api_version: ElasticsearchApiVersion::V6, ..config() }, false, @@ -278,6 +338,7 @@ async fn insert_events_on_aws_with_compression() { endpoints: vec![aws_server()], aws: Some(RegionOrEndpoint::with_region(String::from("localstack"))), compression: Compression::gzip_default(), + api_version: ElasticsearchApiVersion::V6, ..config() }, false, @@ -363,18 +424,30 @@ async fn distributed_insert_events() { async fn distributed_insert_events_failover() { trace_init(); - run_insert_tests( - ElasticsearchConfig { - // A valid endpoint and some random non elasticsearch endpoint - endpoints: vec![http_server(), "http://localhost:2347".into()], - doc_type: Some("log_lines".into()), - compression: Compression::None, - ..config() - }, - false, - BatchStatus::Delivered, - ) - .await; + let mut config = ElasticsearchConfig { + auth: Some(ElasticsearchAuth::Basic { + user: "elastic".into(), + password: "vector".to_string().into(), + }), + // Valid endpoints and some random non elasticsearch endpoint + endpoints: vec![ + http_server(), + https_server(), + "http://localhost:2347".into(), + ], + doc_type: Some("log_lines".into()), + compression: Compression::None, + tls: Some(TlsConfig { + ca_file: Some(tls::TEST_PEM_CA_PATH.into()), + ..Default::default() + }), + ..config() + }; + config.bulk = Some(BulkConfig { + index: Some(gen_index()), + action: None, + }); + run_insert_tests_with_multiple_endpoints(&config).await; } async fn run_insert_tests( @@ -409,10 +482,9 @@ async fn run_insert_tests_with_config( break_events: bool, batch_status: BatchStatus, ) { - let common = ElasticsearchCommon::parse_many(config) + let common = ElasticsearchCommon::parse_single(config) .await - .expect("Config error") - .remove(0); + .expect("Config error"); let index = match config.mode { // Data stream mode uses an index name generated from the event. ElasticsearchMode::DataStream => format!( @@ -511,7 +583,8 @@ async fn run_insert_tests_with_config( } async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) { - let commons = ElasticsearchCommon::parse_many(config) + let cx = SinkContext::new_test(); + let commons = ElasticsearchCommon::parse_many(config, cx.proxy()) .await .expect("Config error"); let index = match config.mode { @@ -527,7 +600,6 @@ async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) .unwrap(), }; - let cx = SinkContext::new_test(); let (sink, healthcheck) = config .build(cx.clone()) .await @@ -548,13 +620,13 @@ async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) // make sure writes all all visible for common in commons { - flush(common).await.expect("Flushing writes failed"); + let _ = flush(common).await; } let client = create_http_client(); let mut total = 0; for base_url in base_urls { - let response = client + if let Ok(response) = client .get(&format!("{}/{}/_search", base_url, index)) .basic_auth("elastic", Some("vector")) .json(&json!({ @@ -562,22 +634,23 @@ async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) })) .send() .await - .unwrap() - .json::() - .await - .unwrap(); - - let endpoint_total = response["hits"]["total"]["value"] - .as_u64() - .or_else(|| response["hits"]["total"].as_u64()) - .expect("Elasticsearch response does not include hits->total nor hits->total->value"); - - assert!( - input.len() as u64 > endpoint_total, - "One of the endpoints received all of the events." - ); - - total += endpoint_total; + { + let response = response.json::().await.unwrap(); + + let endpoint_total = response["hits"]["total"]["value"] + .as_u64() + .or_else(|| response["hits"]["total"].as_u64()) + .expect( + "Elasticsearch response does not include hits->total nor hits->total->value", + ); + + assert!( + input.len() as u64 > endpoint_total, + "One of the endpoints received all of the events." + ); + + total += endpoint_total; + } } assert_eq!(input.len() as u64, total); diff --git a/src/sinks/elasticsearch/mod.rs b/src/sinks/elasticsearch/mod.rs index bbda88b619d7a..8b1a3c42dbce6 100644 --- a/src/sinks/elasticsearch/mod.rs +++ b/src/sinks/elasticsearch/mod.rs @@ -171,6 +171,27 @@ impl ElasticsearchCommonMode { } } +/// Configuration for api version. +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(deny_unknown_fields, rename_all = "snake_case")] +pub enum ElasticsearchApiVersion { + /// Auto-detect the api version. Will fail if endpoint isn't reachable. + Auto, + /// Use the Elasticsearch 6.x API. + V6, + /// Use the Elasticsearch 7.x API. + V7, + /// Use the Elasticsearch 8.x API. + V8, +} + +impl Default for ElasticsearchApiVersion { + fn default() -> Self { + Self::Auto + } +} + #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum ParseError { diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index bb519e553c332..b41c25d0efc03 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -5,8 +5,8 @@ use crate::{ event::{LogEvent, Metric, MetricKind, MetricValue, Value}, sinks::{ elasticsearch::{ - sink::process_log, BulkAction, BulkConfig, DataStreamConfig, ElasticsearchCommon, - ElasticsearchConfig, ElasticsearchMode, + sink::process_log, BulkAction, BulkConfig, DataStreamConfig, ElasticsearchApiVersion, + ElasticsearchCommon, ElasticsearchConfig, ElasticsearchMode, }, util::encoding::Encoder, }, @@ -26,6 +26,7 @@ async fn sets_create_action_when_configured() { index: Some(String::from("vector")), }), endpoints: vec![String::from("https://example.com")], + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -74,6 +75,7 @@ async fn encode_datastream_mode() { }), endpoints: vec![String::from("https://example.com")], mode: ElasticsearchMode::DataStream, + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -120,6 +122,7 @@ async fn encode_datastream_mode_no_routing() { namespace: Template::try_from("something").unwrap(), ..Default::default() }), + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -155,6 +158,7 @@ async fn handle_metrics() { index: Some(String::from("vector")), }), endpoints: vec![String::from("https://example.com")], + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -196,6 +200,7 @@ async fn decode_bulk_action_error() { index: Some(String::from("vector")), }), endpoints: vec![String::from("https://example.com")], + api_version: ElasticsearchApiVersion::V7, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -215,6 +220,7 @@ async fn decode_bulk_action() { index: Some(String::from("vector")), }), endpoints: vec![String::from("https://example.com")], + api_version: ElasticsearchApiVersion::V7, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -242,6 +248,7 @@ async fn encode_datastream_mode_no_sync() { sync_fields: false, ..Default::default() }), + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; @@ -285,6 +292,7 @@ async fn allows_using_except_fields() { ) .unwrap(), endpoints: vec![String::from("https://example.com")], + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); @@ -319,6 +327,7 @@ async fn allows_using_only_fields() { }), encoding: Transformer::new(Some(vec![owned_value_path!("foo")]), None, None).unwrap(), endpoints: vec![String::from("https://example.com")], + api_version: ElasticsearchApiVersion::V6, ..Default::default() }; let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index d9bf3d9f00e9e..3f19c62fd14dd 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -10,7 +10,7 @@ use crate::{ config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, event::EventArray, sinks::{ - elasticsearch::{BulkConfig, ElasticsearchConfig}, + elasticsearch::{BulkConfig, ElasticsearchApiVersion, ElasticsearchConfig}, util::{ http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, StreamSink, TowerRequestConfig, @@ -98,6 +98,7 @@ impl SinkConfig for SematextLogsConfig { ..Default::default() }, encoding: self.encoding.clone(), + api_version: ElasticsearchApiVersion::V6, ..Default::default() } .build(cx) diff --git a/website/cue/reference/components/sinks/elasticsearch.cue b/website/cue/reference/components/sinks/elasticsearch.cue index 16efdbeb7d85f..7d5b5477da70c 100644 --- a/website/cue/reference/components/sinks/elasticsearch.cue +++ b/website/cue/reference/components/sinks/elasticsearch.cue @@ -74,6 +74,20 @@ components: sinks: elasticsearch: { } configuration: { + api_version: { + common: false + description: "The API version of Elasticsearch." + required: false + type: string: { + default: "auto" + enum: { + auto: "Auto-detect the api version. Will fail if version endpoint (`/_cluster/state/version`) isn't reachable." + v6: "Use the Elasticsearch 6.x API." + v7: "Use the Elasticsearch 7.x API." + v8: "Use the Elasticsearch 8.x API." + } + } + } auth: { common: false description: "Options for the authentication strategy." @@ -345,6 +359,7 @@ components: sinks: elasticsearch: { If enabled the `doc_type` option will be ignored. """ + warnings: ["This option has been deprecated, the `api_version` option should be used instead."] required: false type: bool: default: false }