Skip to content

Commit

Permalink
enhancement(elasticsearch sink): Reintroduce api_version option (#1…
Browse files Browse the repository at this point in the history
…5082)

Reintroduces #14918 that was reverted in #15006. It turned out that this is a breaking change so it was decided to not abort Vector on failure to detect Elasticsearch version. Instead we log a warning and estimate the version. This will make the transition smoother. In a future version this can be changed to fatal error.
  • Loading branch information
ktff authored Nov 9, 2022
1 parent 225eac5 commit 0719f34
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 87 deletions.
3 changes: 2 additions & 1 deletion src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
SinkContext,
},
sinks::{
elasticsearch::{ElasticsearchAuth, ElasticsearchConfig},
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig},
util::{http::RequestConfig, Compression},
Healthcheck, VectorSink,
},
Expand Down Expand Up @@ -100,6 +100,7 @@ impl SinkConfig for AxiomConfig {
query: Some(query),
tls: self.tls.clone(),
request,
api_version: ElasticsearchApiVersion::V6,
..Default::default()
};

Expand Down
205 changes: 163 additions & 42 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use std::collections::HashMap;

use aws_types::credentials::SharedCredentialsProvider;
use aws_types::region::Region;
use bytes::Bytes;
use http::{StatusCode, Uri};
use bytes::{Buf, Bytes};
use http::{Response, StatusCode, Uri};
use hyper::{body, Body};
use serde::Deserialize;
use snafu::ResultExt;
use vector_core::config::proxy::ProxyConfig;

use super::{
request_builder::ElasticsearchRequestBuilder, ElasticsearchEncoder, InvalidHostSnafu, Request,
request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder,
InvalidHostSnafu, Request,
};
use crate::{
http::{Auth, HttpClient, MaybeAuth},
Expand Down Expand Up @@ -38,7 +42,12 @@ pub struct ElasticsearchCommon {
}

impl ElasticsearchCommon {
pub async fn parse_config(config: &ElasticsearchConfig, endpoint: &str) -> crate::Result<Self> {
pub async fn parse_config(
config: &ElasticsearchConfig,
endpoint: &str,
proxy_config: &ProxyConfig,
version: &mut Option<usize>,
) -> crate::Result<Self> {
// Test the configured host, but ignore the result
let uri = format!("{}/_test", endpoint);
let uri = uri
Expand Down Expand Up @@ -78,16 +87,6 @@ impl ElasticsearchCommon {

let mode = config.common_mode()?;

let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into());
let request_builder = ElasticsearchRequestBuilder {
compression: config.compression,
encoder: ElasticsearchEncoder {
transformer: config.encoding.clone(),
doc_type,
suppress_type_name: config.suppress_type_name,
},
};

let tower_request = config
.request
.tower
Expand All @@ -103,11 +102,13 @@ impl ElasticsearchCommon {
query_params.insert("pipeline".into(), pipeline.into());
}

let mut query = url::form_urlencoded::Serializer::new(String::new());
for (p, v) in &query_params {
query.append_pair(&p[..], &v[..]);
}
let bulk_url = format!("{}/_bulk?{}", base_url, query.finish());
let bulk_url = {
let mut query = url::form_urlencoded::Serializer::new(String::new());
for (p, v) in &query_params {
query.append_pair(&p[..], &v[..]);
}
format!("{}/_bulk?{}", base_url, query.finish())
};
let bulk_uri = bulk_url.parse::<Uri>().unwrap();

let tls_settings = TlsSettings::from_options(&config.tls)?;
Expand All @@ -122,6 +123,60 @@ impl ElasticsearchCommon {

let region = config.aws.as_ref().and_then(|config| config.region());

let version = if let Some(version) = *version {
version
} else {
let ver = match config.api_version {
ElasticsearchApiVersion::V6 => 6,
ElasticsearchApiVersion::V7 => 7,
ElasticsearchApiVersion::V8 => 8,
ElasticsearchApiVersion::Auto => {
match get_version(
&base_url,
&http_auth,
&aws_auth,
&region,
&request,
&tls_settings,
proxy_config,
)
.await
{
Ok(version) => version,
// This error should be fatal, but for now we only emit it as a warning
// to make the transition smoother.
Err(error) => {
// For now, estimate version.
let assumed_version = match config.suppress_type_name {
Some(true) => 8,
_ => 6,
};
warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.",%assumed_version, %error);
assumed_version
}
}
}
};
*version = Some(ver);
ver
};

let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into());
let suppress_type_name = if let Some(suppress_type_name) = config.suppress_type_name {
warn!(message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead.");
suppress_type_name
} else {
version >= 7
};
let request_builder = ElasticsearchRequestBuilder {
compression: config.compression,
encoder: ElasticsearchEncoder {
transformer: config.encoding.clone(),
doc_type,
suppress_type_name,
},
};

Ok(Self {
http_auth,
base_url,
Expand All @@ -138,11 +193,17 @@ impl ElasticsearchCommon {
}

/// Parses endpoints into a vector of ElasticsearchCommons. The resulting vector is guaranteed to not be empty.
pub async fn parse_many(config: &ElasticsearchConfig) -> crate::Result<Vec<Self>> {
pub async fn parse_many(
config: &ElasticsearchConfig,
proxy_config: &ProxyConfig,
) -> crate::Result<Vec<Self>> {
let mut version = None;
if let Some(endpoint) = config.endpoint.as_ref() {
warn!(message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead.");
if config.endpoints.is_empty() {
Ok(vec![Self::parse_config(config, endpoint).await?])
Ok(vec![
Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
])
} else {
Err(ParseError::EndpointsExclusive.into())
}
Expand All @@ -151,7 +212,8 @@ impl ElasticsearchCommon {
} else {
let mut commons = Vec::new();
for endpoint in config.endpoints.iter() {
commons.push(Self::parse_config(config, endpoint).await?);
commons
.push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
}
Ok(commons)
}
Expand All @@ -160,30 +222,25 @@ impl ElasticsearchCommon {
/// Parses a single endpoint, else panics.
#[cfg(test)]
pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
let mut commons = Self::parse_many(config).await?;
assert!(commons.len() == 1);
let mut commons =
Self::parse_many(config, crate::config::SinkContext::new_test().proxy()).await?;
assert_eq!(commons.len(), 1);
Ok(commons.remove(0))
}

pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
let mut builder = Request::get(format!("{}/_cluster/health", self.base_url));

if let Some(authorization) = &self.http_auth {
builder = authorization.apply_builder(builder);
}

for (header, value) in &self.request.headers {
builder = builder.header(&header[..], &value[..]);
}

let mut request = builder.body(Bytes::new())?;

if let Some(credentials_provider) = &self.aws_auth {
sign_request(&mut request, credentials_provider, &self.region).await?;
}
let response = client.send(request.map(hyper::Body::from)).await?;

match response.status() {
match get(
&self.base_url,
&self.http_auth,
&self.aws_auth,
&self.region,
&self.request,
client,
"/_cluster/health",
)
.await?
.status()
{
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
Expand All @@ -197,3 +254,67 @@ pub async fn sign_request(
) -> crate::Result<()> {
crate::aws::sign_request("es", request, credentials_provider, region).await
}

async fn get_version(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
request: &RequestConfig,
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct ClusterState {
version: Option<usize>,
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(
base_url,
http_auth,
aws_auth,
region,
request,
client,
"/_cluster/state/version",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

let (_, body) = response.into_parts();
let mut body = body::aggregate(body).await?;
let body = body.copy_to_bytes(body.remaining());
let ClusterState { version } = serde_json::from_slice(&body)?;
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
}

async fn get(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
request: &RequestConfig,
client: HttpClient,
path: &str,
) -> crate::Result<Response<Body>> {
let mut builder = Request::get(format!("{}{}", base_url, path));

if let Some(authorization) = &http_auth {
builder = authorization.apply_builder(builder);
}

for (header, value) in &request.headers {
builder = builder.header(&header[..], &value[..]);
}

let mut request = builder.body(Bytes::new())?;

if let Some(credentials_provider) = aws_auth {
sign_request(&mut request, credentials_provider, region).await?;
}
client
.send(request.map(hyper::Body::from))
.await
.map_err(Into::into)
}
40 changes: 35 additions & 5 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::{
retry::ElasticsearchRetryLogic,
service::{ElasticsearchService, HttpRequestBuilder},
sink::ElasticsearchSink,
BatchActionTemplateSnafu, ElasticsearchAuth, ElasticsearchCommon,
ElasticsearchCommonMode, ElasticsearchMode, IndexTemplateSnafu,
BatchActionTemplateSnafu, ElasticsearchApiVersion, ElasticsearchAuth,
ElasticsearchCommon, ElasticsearchCommonMode, ElasticsearchMode, IndexTemplateSnafu,
},
util::{
http::RequestConfig, service::HealthConfig, BatchConfig, Compression,
Expand Down Expand Up @@ -61,13 +61,19 @@ pub struct ElasticsearchConfig {
/// set this option since Elasticsearch has removed it.
pub doc_type: Option<String>,

/// The API version of Elasticsearch.
#[serde(default)]
pub api_version: ElasticsearchApiVersion,

/// Whether or not to send the `type` field to Elasticsearch.
///
/// `type` field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x.
///
/// If enabled, the `doc_type` option will be ignored.
#[serde(default)]
pub suppress_type_name: bool,
///
/// This option has been deprecated, the `api_version` option should be used instead.
#[configurable(deprecated)]
pub suppress_type_name: Option<bool>,

/// Whether or not to retry successful requests containing partial failures.
///
Expand Down Expand Up @@ -381,7 +387,7 @@ impl DataStreamConfig {
#[async_trait::async_trait]
impl SinkConfig for ElasticsearchConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let commons = ElasticsearchCommon::parse_many(self).await?;
let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
let common = commons[0].clone();

let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
Expand Down Expand Up @@ -491,4 +497,28 @@ mod tests {
)
.unwrap();
}

#[test]
fn parse_version() {
let config = toml::from_str::<ElasticsearchConfig>(
r#"
endpoints = [""]
api_version = "v7"
"#,
)
.unwrap();
assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
}

#[test]
fn parse_version_auto() {
let config = toml::from_str::<ElasticsearchConfig>(
r#"
endpoints = [""]
api_version = "auto"
"#,
)
.unwrap();
assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
}
}
Loading

0 comments on commit 0719f34

Please sign in to comment.