Skip to content

Commit

Permalink
Refactoring & format
Browse files Browse the repository at this point in the history
  • Loading branch information
johtani committed Aug 31, 2020
1 parent 10b543a commit dc2adb7
Showing 1 changed file with 47 additions and 60 deletions.
107 changes: 47 additions & 60 deletions src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct EsConfig {
impl EsConfig {
fn new(config_file: &str) -> Self {
let f = File::open(config_file)
.expect(format!("config file is not found. {}", config_file).as_str());
.expect(format!("Config file is not found. {}", config_file).as_str());
let config: EsConfig = serde_yaml::from_reader(f).expect(format!("Parse Error").as_str());
return config;
}
Expand All @@ -37,7 +37,7 @@ pub struct ElasticsearchOutput {
config: EsConfig,
}

pub fn load_schema(schema_file: &str) -> Value {
fn load_schema(schema_file: &str) -> Value {
info!("schema file is {}", schema_file);
let f = File::open(schema_file)
.expect(format!("schema file is not found. {}", schema_file).as_str());
Expand All @@ -52,11 +52,10 @@ impl ElasticsearchOutput {
debug!("url: {}", config.url);
debug!("buffer_size: {}", config.buffer_size);
let client = ElasticsearchOutput::create_elasticsearch_client(&config);
let buffer = vec![];
ElasticsearchOutput {
client,
buffer,
config,
buffer: vec![],
}
}

Expand All @@ -78,12 +77,6 @@ impl ElasticsearchOutput {
}
}

pub fn exist_index(&self) -> bool {
let mut _rt = tokio::runtime::Runtime::new().expect("Fail initializing runtime");
let task = self.call_indices_exists();
_rt.block_on(task).expect("Something wrong...")
}

pub fn close(&mut self) {
let chunk_size = if self.buffer.len() <= self.config.buffer_size {
self.buffer.len()
Expand All @@ -106,47 +99,41 @@ impl ElasticsearchOutput {
fn create_credentials(config: &EsConfig) -> Option<Credentials> {
match &config.user {
None => None,
Some(user) => {
match &config.password {
None => None,
Some(password) => {
Some(Credentials::Basic(user.to_string(), password.to_string()))
}
}
}
Some(user) => match &config.password {
None => None,
Some(password) => Some(Credentials::Basic(user.to_string(), password.to_string())),
},
}
}

fn create_elasticsearch_client(config: &EsConfig) -> Elasticsearch {
match &config.cloud_id {
return match &config.cloud_id {
None => {
debug!("Using url...");
let url = Url::parse(config.url.as_str()).unwrap();
let conn_pool = SingleNodeConnectionPool::new(url);
let builder: TransportBuilder =
match ElasticsearchOutput::create_credentials(&config) {
None => TransportBuilder::new(conn_pool).disable_proxy(),
Some(creds) => TransportBuilder::new(conn_pool).disable_proxy().auth(creds),
};
let transport = builder.build().unwrap();
return Elasticsearch::new(transport);
let builder =
TransportBuilder::new(SingleNodeConnectionPool::new(url)).disable_proxy();
match ElasticsearchOutput::create_credentials(&config) {
None => Elasticsearch::new(builder.build().unwrap()),
Some(credentials) => {
Elasticsearch::new(builder.auth(credentials).build().unwrap())
}
}
}
Some(cloud_id) => {
debug!("Using cloud_id...");
let credentials = match ElasticsearchOutput::create_credentials(&config) {
None => { panic!("Cannot create Credentials with user & password. Both user and password are required.") }
Some(creds) => { creds }
};
match Transport::cloud(cloud_id.as_str(), credentials) {
Ok(transport) => {
return Elasticsearch::new(transport);
}
Err(err) => {
panic!("Cannot create client for Elastic Cloud. {}", err);
}
}
let credentials = ElasticsearchOutput::create_credentials(&config).expect("Cannot create Credentials with user & password. Both user and password are required.");
let transport = Transport::cloud(cloud_id.as_str(), credentials)
.expect("Cannot create client for Elastic Cloud. ");
Elasticsearch::new(transport)
}
}
};
}

fn exist_index(&self) -> bool {
let mut _rt = tokio::runtime::Runtime::new().expect("Fail initializing runtime");
let task = self.call_indices_exists();
_rt.block_on(task).expect("Something wrong...")
}

async fn call_indices_create(&self) -> Result<(), String> {
Expand All @@ -159,22 +146,22 @@ impl ElasticsearchOutput {
.send()
.await;
return match response {
Ok(response) => {
match &response.error_for_status_code_ref() {
Ok(_) => {
info!("{} index was created.", &self.config.index_name);
Ok(())
}
Err(error) => {
warn!(
"Create index request has failed. Status Code is {:?}.",
error.status_code().unwrap()
);
if let Ok(body) = &response.text().await { warn!("{}", body); }
Err(String::from("Create index failed."))
Ok(response) => match &response.error_for_status_code_ref() {
Ok(_) => {
info!("{} index was created.", &self.config.index_name);
Ok(())
}
Err(error) => {
warn!(
"Create index request has failed. Status Code is {:?}.",
error.status_code().unwrap()
);
if let Ok(body) = &response.text().await {
warn!("{}", body);
}
Err(String::from("Create index failed."))
}
}
},
Err(error) => {
error!("create index failed. {}", error);
Err(error.to_string())
Expand All @@ -201,11 +188,10 @@ impl ElasticsearchOutput {
status_code
);
warn!("Indices exists request failed");
if let Ok(body) = &response.text().await { warn!("{}", body); }
Err(format!(
"Indices exists request failed. {:?}",
status_code
))
if let Ok(body) = &response.text().await {
warn!("{}", body);
}
Err(format!("Indices exists request failed. {:?}", status_code))
}
},
Err(error) => {
Expand All @@ -215,7 +201,7 @@ impl ElasticsearchOutput {
};
}

pub async fn proceed_chunk(&self, chunk: &[String]) -> Result<(), Box<dyn std::error::Error>> {
async fn proceed_chunk(&self, chunk: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let mut body: Vec<JsonBody<_>> = Vec::new();
for d in chunk {
let doc_map: Map<String, Value> =
Expand All @@ -228,6 +214,7 @@ impl ElasticsearchOutput {
},
};
body.push(json!({"index": {"_id": id}}).into());
// TODO can we use d instead of doc_map?
body.push(JsonBody::from(serde_json::to_value(doc_map).unwrap()));
}
info!("Sending {} documents... ", chunk.len());
Expand Down

0 comments on commit dc2adb7

Please sign in to comment.