Skip to content

Commit 38ad215

Browse files
committed
WIP
1 parent 78becf0 commit 38ad215

24 files changed

+209
-75
lines changed

quickwit/quickwit-indexing/src/actors/doc_processor.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ use tokio::runtime::Handle;
4141
use super::vrl_processing::*;
4242
use crate::actors::Indexer;
4343
use crate::models::{
44-
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
44+
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDoc,
45+
RawDocBatch,
4546
};
4647

4748
const PLAIN_TEXT: &str = "plain_text";
@@ -461,17 +462,20 @@ impl DocProcessor {
461462
Ok(Some(timestamp))
462463
}
463464

464-
fn process_raw_doc(&mut self, raw_doc: Bytes, processed_docs: &mut Vec<ProcessedDoc>) {
465-
let num_bytes = raw_doc.len();
465+
fn process_raw_doc(&mut self, raw_doc: RawDoc, processed_docs: &mut Vec<ProcessedDoc>) {
466+
let num_bytes = raw_doc.doc.len();
466467

467468
#[cfg(feature = "vrl")]
468469
let transform_opt = self.transform_opt.as_mut();
469470
#[cfg(not(feature = "vrl"))]
470471
let transform_opt: Option<&mut VrlProgram> = None;
471472

472-
for json_doc_result in parse_raw_doc(self.input_format, raw_doc, num_bytes, transform_opt) {
473-
let processed_doc_result =
474-
json_doc_result.and_then(|json_doc| self.process_json_doc(json_doc));
473+
for json_doc_result in
474+
parse_raw_doc(self.input_format, raw_doc.doc, num_bytes, transform_opt)
475+
{
476+
let processed_doc_result = json_doc_result.and_then(|json_doc| {
477+
self.process_json_doc(json_doc, raw_doc.arrival_timestamp_millis_opt)
478+
});
475479

476480
match processed_doc_result {
477481
Ok(processed_doc) => {
@@ -491,7 +495,11 @@ impl DocProcessor {
491495
}
492496
}
493497

494-
fn process_json_doc(&self, json_doc: JsonDoc) -> Result<ProcessedDoc, DocProcessorError> {
498+
fn process_json_doc(
499+
&self,
500+
json_doc: JsonDoc,
501+
arrival_timestamp_millis_opt: Option<u64>,
502+
) -> Result<ProcessedDoc, DocProcessorError> {
495503
let num_bytes = json_doc.num_bytes;
496504

497505
let (partition, doc) = self
@@ -503,6 +511,7 @@ impl DocProcessor {
503511
timestamp_opt,
504512
partition,
505513
num_bytes,
514+
arrival_timestamp_millis_opt,
506515
})
507516
}
508517
}
@@ -572,9 +581,10 @@ impl Handler<RawDocBatch> for DocProcessor {
572581
if self.publish_lock.is_dead() {
573582
return Ok(());
574583
}
575-
let mut processed_docs: Vec<ProcessedDoc> = Vec::with_capacity(raw_doc_batch.docs.len());
584+
let mut processed_docs: Vec<ProcessedDoc> =
585+
Vec::with_capacity(raw_doc_batch.raw_docs.len());
576586

577-
for raw_doc in raw_doc_batch.docs {
587+
for raw_doc in raw_doc_batch.raw_docs {
578588
let _protected_zone_guard = ctx.protect_zone();
579589
self.process_raw_doc(raw_doc, &mut processed_docs);
580590
ctx.record_progress();

quickwit/quickwit-indexing/src/actors/indexer.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ impl IndexerState {
305305
timestamp_opt,
306306
partition,
307307
num_bytes,
308+
arrival_timestamp_millis_opt,
308309
} = doc;
309310
counters.num_docs_in_workbench += 1;
310311
let (indexed_split, split_created) = self.get_or_create_indexed_split(
@@ -326,6 +327,12 @@ impl IndexerState {
326327
if let Some(timestamp) = timestamp_opt {
327328
record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range);
328329
}
330+
if let Some(arrival_timestamp_millis) = arrival_timestamp_millis_opt {
331+
record_arrival_timestamp(
332+
arrival_timestamp_millis,
333+
&mut indexed_split.split_attrs.min_arrival_timestamp_millis_opt,
334+
);
335+
}
329336
let _protect_guard = ctx.protect_zone();
330337
indexed_split
331338
.index_writer
@@ -468,6 +475,14 @@ fn record_timestamp(timestamp: DateTime, time_range: &mut Option<RangeInclusive<
468475
*time_range = Some(new_timestamp_range);
469476
}
470477

478+
fn record_arrival_timestamp(
479+
arrival_timestamp_millis: u64,
480+
min_arrival_timestamp_millis_opt: &mut Option<u64>,
481+
) {
482+
let current_min = min_arrival_timestamp_millis_opt.get_or_insert(arrival_timestamp_millis);
483+
*current_min = arrival_timestamp_millis.min(*current_min);
484+
}
485+
471486
#[async_trait]
472487
impl Handler<CommitTimeout> for Indexer {
473488
type Reply = ();
@@ -797,6 +812,7 @@ mod tests {
797812
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
798813
partition: 1,
799814
num_bytes: 30,
815+
arrival_timestamp_millis_opt: None,
800816
},
801817
ProcessedDoc {
802818
doc: doc!(
@@ -806,6 +822,7 @@ mod tests {
806822
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
807823
partition: 1,
808824
num_bytes: 30,
825+
arrival_timestamp_millis_opt: None,
809826
},
810827
],
811828
SourceCheckpointDelta::from_range(4..6),
@@ -823,6 +840,7 @@ mod tests {
823840
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435i64)),
824841
partition: 1,
825842
num_bytes: 30,
843+
arrival_timestamp_millis_opt: None,
826844
},
827845
ProcessedDoc {
828846
doc: doc!(
@@ -832,6 +850,7 @@ mod tests {
832850
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
833851
partition: 1,
834852
num_bytes: 30,
853+
arrival_timestamp_millis_opt: None,
835854
},
836855
],
837856
SourceCheckpointDelta::from_range(6..8),
@@ -848,6 +867,7 @@ mod tests {
848867
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
849868
partition: 1,
850869
num_bytes: 30,
870+
arrival_timestamp_millis_opt: None,
851871
}],
852872
SourceCheckpointDelta::from_range(8..9),
853873
false,
@@ -933,6 +953,7 @@ mod tests {
933953
timestamp_opt: None,
934954
partition: 0,
935955
num_bytes,
956+
arrival_timestamp_millis_opt: None,
936957
}
937958
};
938959
for i in 0..10_000 {
@@ -1011,6 +1032,7 @@ mod tests {
10111032
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
10121033
partition: 1,
10131034
num_bytes: 30,
1035+
arrival_timestamp_millis_opt: None,
10141036
}],
10151037
SourceCheckpointDelta::from_range(position..position + 1),
10161038
false,
@@ -1089,6 +1111,7 @@ mod tests {
10891111
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
10901112
partition: 1,
10911113
num_bytes: 30,
1114+
arrival_timestamp_millis_opt: None,
10921115
}],
10931116
SourceCheckpointDelta::from_range(8..9),
10941117
false,
@@ -1176,6 +1199,7 @@ mod tests {
11761199
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_529_435)),
11771200
partition: 1,
11781201
num_bytes: 30,
1202+
arrival_timestamp_millis_opt: None,
11791203
}],
11801204
SourceCheckpointDelta::from_range(8..9),
11811205
false,
@@ -1260,6 +1284,7 @@ mod tests {
12601284
timestamp_opt: None,
12611285
partition: 1,
12621286
num_bytes: 30,
1287+
arrival_timestamp_millis_opt: None,
12631288
},
12641289
ProcessedDoc {
12651290
doc: doc!(
@@ -1269,6 +1294,7 @@ mod tests {
12691294
timestamp_opt: None,
12701295
partition: 3,
12711296
num_bytes: 30,
1297+
arrival_timestamp_millis_opt: None,
12721298
},
12731299
],
12741300
SourceCheckpointDelta::from_range(8..9),
@@ -1354,6 +1380,7 @@ mod tests {
13541380
timestamp_opt: None,
13551381
partition,
13561382
num_bytes: 30,
1383+
arrival_timestamp_millis_opt: None,
13571384
}],
13581385
SourceCheckpointDelta::from_range(partition..partition + 1),
13591386
false,
@@ -1432,6 +1459,7 @@ mod tests {
14321459
timestamp_opt: None,
14331460
partition: 0,
14341461
num_bytes: 30,
1462+
arrival_timestamp_millis_opt: None,
14351463
}],
14361464
SourceCheckpointDelta::from_range(0..1),
14371465
false,
@@ -1503,6 +1531,7 @@ mod tests {
15031531
timestamp_opt: None,
15041532
partition: 0,
15051533
num_bytes: 30,
1534+
arrival_timestamp_millis_opt: None,
15061535
}],
15071536
SourceCheckpointDelta::from_range(0..1),
15081537
false,
@@ -1559,6 +1588,7 @@ mod tests {
15591588
timestamp_opt: None,
15601589
partition: 0,
15611590
num_bytes: 30,
1591+
arrival_timestamp_millis_opt: None,
15621592
}],
15631593
SourceCheckpointDelta::from_range(0..1),
15641594
true,

quickwit/quickwit-indexing/src/actors/merge_executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ pub fn merge_split_attrs(
287287
uncompressed_docs_size_in_bytes,
288288
delete_opstamp,
289289
num_merge_ops: max_merge_ops(splits) + 1,
290+
min_arrival_timestamp_millis_opt: None,
290291
})
291292
}
292293

@@ -470,6 +471,7 @@ impl MergeExecutor {
470471
uncompressed_docs_size_in_bytes,
471472
delete_opstamp: last_delete_opstamp,
472473
num_merge_ops: split.num_merge_ops,
474+
min_arrival_timestamp_millis_opt: None,
473475
},
474476
index: merged_index,
475477
split_scratch_directory: merge_scratch_directory,

quickwit/quickwit-indexing/src/actors/packager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ mod tests {
529529
replaced_split_ids: Vec::new(),
530530
delete_opstamp: 0,
531531
num_merge_ops: 0,
532+
min_arrival_timestamp_millis_opt: None,
532533
},
533534
index,
534535
split_scratch_directory,

quickwit/quickwit-indexing/src/actors/publisher.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ use fail::fail_point;
1818
use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity};
1919
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest};
2020
use serde::Serialize;
21+
use time::OffsetDateTime;
2122
use tracing::{info, instrument, warn};
2223

2324
use crate::actors::MergePlanner;
25+
use crate::metrics::INDEXER_METRICS;
2426
use crate::models::{NewSplits, SplitsUpdate};
2527
use crate::source::{SourceActor, SuggestTruncate};
2628

@@ -143,6 +145,10 @@ impl Handler<SplitsUpdate> for Publisher {
143145
.iter()
144146
.map(|split| split.split_id.clone())
145147
.collect();
148+
let min_arrival_timestamp_millis: Vec<u64> = new_splits
149+
.iter()
150+
.flat_map(|split| split.min_arrival_timestamp_millis_opt)
151+
.collect();
146152
if let Some(_guard) = publish_lock.acquire().await {
147153
let publish_splits_request = PublishSplitsRequest {
148154
index_uid: Some(index_uid),
@@ -163,8 +169,8 @@ impl Handler<SplitsUpdate> for Publisher {
163169
return Ok(());
164170
}
165171
info!("publish-new-splits");
166-
if let Some(source_mailbox) = self.source_mailbox_opt.as_ref()
167-
&& let Some(checkpoint) = checkpoint_delta_opt
172+
if let Some(source_mailbox) = &self.source_mailbox_opt
173+
&& let Some(checkpoint) = &checkpoint_delta_opt
168174
{
169175
// We voluntarily do not log anything here.
170176
//
@@ -182,9 +188,8 @@ impl Handler<SplitsUpdate> for Publisher {
182188
warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source");
183189
}
184190
}
185-
186191
if !new_splits.is_empty() {
187-
// The merge planner is not necessarily awake and this is not an error.
192+
// The merge planner is not necessarily awake and this πis not an error.
188193
// For instance, when a source reaches its end, and the last "new" split
189194
// has been packaged, the packager finalizer sends a message to the merge
190195
// planner in order to stop it.
@@ -193,7 +198,6 @@ impl Handler<SplitsUpdate> for Publisher {
193198
.send_message(merge_planner_mailbox, NewSplits { new_splits })
194199
.await;
195200
}
196-
197201
if replaced_split_ids.is_empty() {
198202
self.counters.num_published_splits += 1;
199203
} else {
@@ -202,6 +206,18 @@ impl Handler<SplitsUpdate> for Publisher {
202206
} else {
203207
self.counters.num_empty_splits += 1;
204208
}
209+
let now = OffsetDateTime::now_utc();
210+
let now_timestamp = now.unix_timestamp();
211+
212+
for min_arrival_timestamp_millis in min_arrival_timestamp_millis {
213+
let min_arrival_timestamp_secs = min_arrival_timestamp_millis as i64 / 1_000;
214+
215+
if let Some(lag_seconds) = now_timestamp.checked_sub(min_arrival_timestamp_secs) {
216+
INDEXER_METRICS
217+
.indexing_lag_seconds
218+
.observe(lag_seconds as f64);
219+
}
220+
}
205221
fail_point!("publisher:after");
206222
Ok(())
207223
}

quickwit/quickwit-indexing/src/actors/uploader.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ mod tests {
593593
split_id: "test-split".to_string(),
594594
delete_opstamp: 10,
595595
num_merge_ops: 0,
596+
min_arrival_timestamp_millis_opt: None,
596597
},
597598
serialized_split_fields: Vec::new(),
598599
split_scratch_directory,
@@ -707,6 +708,7 @@ mod tests {
707708
],
708709
delete_opstamp: 0,
709710
num_merge_ops: 0,
711+
min_arrival_timestamp_millis_opt: None,
710712
},
711713
serialized_split_fields: Vec::new(),
712714
split_scratch_directory: split_scratch_directory_1,
@@ -734,6 +736,7 @@ mod tests {
734736
],
735737
delete_opstamp: 0,
736738
num_merge_ops: 0,
739+
min_arrival_timestamp_millis_opt: None,
737740
},
738741
serialized_split_fields: Vec::new(),
739742
split_scratch_directory: split_scratch_directory_2,
@@ -854,6 +857,7 @@ mod tests {
854857
replaced_split_ids: Vec::new(),
855858
delete_opstamp: 10,
856859
num_merge_ops: 0,
860+
min_arrival_timestamp_millis_opt: None,
857861
},
858862
serialized_split_fields: Vec::new(),
859863
split_scratch_directory,
@@ -1036,6 +1040,7 @@ mod tests {
10361040
split_id: SPLIT_ULID_STR.to_string(),
10371041
delete_opstamp: 10,
10381042
num_merge_ops: 0,
1043+
min_arrival_timestamp_millis_opt: None,
10391044
},
10401045
serialized_split_fields: Vec::new(),
10411046
split_scratch_directory,

quickwit/quickwit-indexing/src/metrics.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
use once_cell::sync::Lazy;
1616
use quickwit_common::metrics::{
17-
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, new_counter, new_counter_vec, new_gauge,
18-
new_gauge_vec,
17+
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, linear_buckets,
18+
new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec,
1919
};
2020

2121
pub struct IndexerMetrics {
@@ -31,6 +31,7 @@ pub struct IndexerMetrics {
3131
// We use a lazy counter, as most users do not use Kafka.
3232
#[cfg_attr(not(feature = "kafka"), allow(dead_code))]
3333
pub kafka_rebalance_total: Lazy<IntCounter>,
34+
pub indexing_lag_seconds: Histogram,
3435
}
3536

3637
impl Default for IndexerMetrics {
@@ -106,6 +107,13 @@ impl Default for IndexerMetrics {
106107
&[],
107108
)
108109
}),
110+
indexing_lag_seconds: new_histogram(
111+
"indexing_lag_seconds",
112+
"FIXME",
113+
"indexing",
114+
linear_buckets(0.0, 5.0, 120)
115+
.expect("buckets should have a width and count greater than 0"),
116+
),
109117
}
110118
}
111119
}

0 commit comments

Comments
 (0)