Skip to content

Commit 7bd0a38

Browse files
add comments, remove hard coded values for node types
1 parent 6f0b623 commit 7bd0a38

File tree

11 files changed

+116
-63
lines changed

11 files changed

+116
-63
lines changed

src/analytics.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
http::{
3737
base_path_without_preceding_slash,
3838
cluster::{self, utils::check_liveness},
39-
modal::NodeMetadata,
39+
modal::{NodeMetadata, NodeType},
4040
},
4141
STREAM_NAME_HEADER_KEY,
4242
},
@@ -116,7 +116,9 @@ impl Report {
116116
let mut active_queries = 0;
117117
let mut inactive_queries = 0;
118118

119-
let indexer_infos: Vec<NodeMetadata> = cluster::get_node_info("indexer").await?;
119+
// check liveness of indexers
120+
// get the count of active and inactive indexers
121+
let indexer_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Indexer).await?;
120122
for indexer in indexer_infos {
121123
if check_liveness(&indexer.domain_name).await {
122124
active_indexers += 1;
@@ -125,7 +127,9 @@ impl Report {
125127
}
126128
}
127129

128-
let query_infos: Vec<NodeMetadata> = cluster::get_node_info("query").await?;
130+
// check liveness of queriers
131+
// get the count of active and inactive queriers
132+
let query_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Querier).await?;
129133
for query in query_infos {
130134
if check_liveness(&query.domain_name).await {
131135
active_queries += 1;
@@ -254,11 +258,14 @@ async fn fetch_ingestors_metrics(
254258
let mut vec = vec![];
255259
let mut active_ingestors = 0u64;
256260
let mut offline_ingestors = 0u64;
261+
262+
// for OSS, Query mode fetches the analytics report
263+
// for Enterprise, Prism mode fetches the analytics report
257264
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
258265
// send analytics for ingest servers
259266

260267
// ingestor infos should be valid here, if not some thing is wrong
261-
let ingestor_infos: Vec<NodeMetadata> = cluster::get_node_info("ingestor").await?;
268+
let ingestor_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Ingestor).await?;
262269

263270
for im in ingestor_infos {
264271
if !check_liveness(&im.domain_name).await {

src/catalog/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use crate::{
3030
event::DEFAULT_TIMESTAMP_KEY,
3131
handlers::{
3232
self,
33-
http::{base_path_without_preceding_slash, modal::NodeMetadata},
33+
http::{
34+
base_path_without_preceding_slash,
35+
modal::{NodeMetadata, NodeType},
36+
},
3437
},
3538
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
3639
option::Mode,
@@ -338,6 +341,10 @@ pub async fn remove_manifest_from_snapshot(
338341
meta.first_event_at = None;
339342
storage.put_snapshot(stream_name, meta.snapshot).await?;
340343
}
344+
345+
// retention is initiated from the querier
346+
// request is forwarded to all ingestors to clean up their manifests
347+
// no action required for the Index or Prism nodes
341348
match PARSEABLE.options.mode {
342349
Mode::All | Mode::Ingest => {
343350
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
@@ -359,7 +366,6 @@ pub async fn get_first_event(
359366
) -> Result<Option<String>, ObjectStorageError> {
360367
let mut first_event_at: String = String::default();
361368
match PARSEABLE.options.mode {
362-
Mode::Index | Mode::Prism => unimplemented!(),
363369
Mode::All | Mode::Ingest => {
364370
// get current snapshot
365371
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
@@ -410,7 +416,7 @@ pub async fn get_first_event(
410416
}
411417
Mode::Query => {
412418
let ingestor_metadata: Vec<NodeMetadata> =
413-
handlers::http::cluster::get_node_info("ingestor")
419+
handlers::http::cluster::get_node_info(NodeType::Ingestor)
414420
.await
415421
.map_err(|err| {
416422
error!("Fatal: failed to get ingestor info: {:?}", err);
@@ -440,6 +446,7 @@ pub async fn get_first_event(
440446
}
441447
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
442448
}
449+
_ => {}
443450
}
444451

445452
Ok(Some(first_event_at))

src/cli.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,8 @@ impl Options {
447447
}
448448
}
449449

450+
/// get the address of the server
451+
/// based on the mode
450452
pub fn get_url(&self, mode: Mode) -> Url {
451453
let endpoint = match mode {
452454
Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"),
@@ -458,6 +460,8 @@ impl Options {
458460
self.parse_endpoint(&endpoint)
459461
}
460462

463+
/// get the endpoint for the server
464+
/// if env var is empty, use the address, else use the env var
461465
fn get_endpoint(&self, endpoint: &str, env_var: &str) -> String {
462466
if endpoint.is_empty() {
463467
self.address.to_string()
@@ -472,6 +476,9 @@ impl Options {
472476
}
473477
}
474478

479+
/// parse the endpoint to get the address and port
480+
/// if the address is an env var, resolve it
481+
/// if the port is an env var, resolve it
475482
fn parse_endpoint(&self, endpoint: &str) -> Url {
476483
let addr_parts: Vec<&str> = endpoint.split(':').collect();
477484

@@ -488,6 +495,9 @@ impl Options {
488495
self.build_url(&format!("{}:{}", hostname, port))
489496
}
490497

498+
/// resolve the env var
499+
/// if the env var is not set, panic
500+
/// if the env var is set, return the value
491501
fn resolve_env_var(&self, value: &str) -> String {
492502
if let Some(env_var) = value.strip_prefix('$') {
493503
let resolved_value = env::var(env_var).unwrap_or_else(|_| {
@@ -510,6 +520,7 @@ impl Options {
510520
}
511521
}
512522

523+
/// build the url from the address
513524
fn build_url(&self, address: &str) -> Url {
514525
format!("{}://{}", self.get_scheme(), address)
515526
.parse::<Url>()

src/handlers/airplane.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

3636
use crate::handlers::http::cluster::get_node_info;
37-
use crate::handlers::http::modal::NodeMetadata;
37+
use crate::handlers::http::modal::{NodeMetadata, NodeType};
3838
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
3939
use crate::handlers::livetail::cross_origin_config;
4040
use crate::metrics::QUERY_EXECUTE_TIME;
@@ -180,7 +180,7 @@ impl FlightService for AirServiceImpl {
180180
})
181181
.to_string();
182182

183-
let ingester_metadatas: Vec<NodeMetadata> = get_node_info("ingestor")
183+
let ingester_metadatas: Vec<NodeMetadata> = get_node_info(NodeType::Ingestor)
184184
.await
185185
.map_err(|err| Status::failed_precondition(err.to_string()))?;
186186
let mut minute_result: Vec<RecordBatch> = vec![];

src/handlers/http/about.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use serde_json::{json, Value};
2121

2222
use crate::{
2323
about::{self, get_latest_release},
24-
option::Mode,
2524
parseable::PARSEABLE,
2625
storage::StorageMetadata,
2726
};
@@ -63,12 +62,7 @@ pub async fn about() -> Json<Value> {
6362
let commit = current_release.commit_hash;
6463
let deployment_id = meta.deployment_id.to_string();
6564
let mode = PARSEABLE.get_server_mode_string();
66-
let staging = if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism
67-
{
68-
"".to_string()
69-
} else {
70-
PARSEABLE.options.staging_dir().display().to_string()
71-
};
65+
let staging = PARSEABLE.options.staging_dir().display().to_string();
7266
let grpc_port = PARSEABLE.options.grpc_port;
7367

7468
let store_endpoint = PARSEABLE.storage.get_endpoint();

0 commit comments

Comments
 (0)