Skip to content

Commit 0cc67e5

Browse files
committed
store: Instrument transact_block more
1 parent 3a4295f commit 0cc67e5

File tree

1 file changed

+21
-12
lines changed

1 file changed

+21
-12
lines changed

store/postgres/src/writable.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,7 +1008,11 @@ impl Queue {
10081008
/// to fill up before writing them to maximize the chances that we build
10091009
/// a 'full' write batch, i.e., one that is either big enough or old
10101010
/// enough
1011-
async fn push_write(&self, batch: Batch) -> Result<(), StoreError> {
1011+
async fn push_write(
1012+
&self,
1013+
batch: Batch,
1014+
stopwatch: &StopwatchMetrics,
1015+
) -> Result<(), StoreError> {
10121016
let batch = if ENV_VARS.store.write_batch_size == 0
10131017
|| ENV_VARS.store.write_batch_duration.is_zero()
10141018
|| !self.batch_writes()
@@ -1049,6 +1053,8 @@ impl Queue {
10491053
match existing.try_write() {
10501054
Ok(mut existing) => {
10511055
if existing.weight() < ENV_VARS.store.write_batch_size {
1056+
let _section =
1057+
stopwatch.start_section("transact_block:append_batch");
10521058
let res = existing.append(batch).map(|()| None);
10531059
if existing.weight() >= ENV_VARS.store.write_batch_size {
10541060
self.batch_ready_notify.notify_one();
@@ -1369,7 +1375,7 @@ impl Writer {
13691375
Writer::Sync(store) => store.transact_block_operations(&batch, stopwatch),
13701376
Writer::Async { queue, .. } => {
13711377
self.check_queue_running()?;
1372-
queue.push_write(batch).await
1378+
queue.push_write(batch, stopwatch).await
13731379
}
13741380
}
13751381
}
@@ -1698,16 +1704,19 @@ impl WritableStoreTrait for WritableStore {
16981704
}
16991705
}
17001706

1701-
let batch = Batch::new(
1702-
block_ptr_to.clone(),
1703-
block_time,
1704-
firehose_cursor.clone(),
1705-
mods,
1706-
data_sources,
1707-
deterministic_errors,
1708-
processed_data_sources,
1709-
is_non_fatal_errors_active,
1710-
)?;
1707+
let batch = {
1708+
let _section = stopwatch.start_section("transact_block:batch_new");
1709+
Batch::new(
1710+
block_ptr_to.clone(),
1711+
block_time,
1712+
firehose_cursor.clone(),
1713+
mods,
1714+
data_sources,
1715+
deterministic_errors,
1716+
processed_data_sources,
1717+
is_non_fatal_errors_active,
1718+
)?
1719+
};
17111720
self.writer.write(batch, stopwatch).await?;
17121721

17131722
*self.block_ptr.lock().unwrap() = Some(block_ptr_to);

0 commit comments

Comments
 (0)