Skip to content

Commit a820720

Browse files
committed
all: Measure loading time in EntityCache.as_modifications
This adds a new section "as_modifications_load" that shows us how much time we spend reading from the database
1 parent 8fa65fd commit a820720

File tree

8 files changed

+62
-45
lines changed

8 files changed

+62
-45
lines changed

core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async fn process_record_batch_group<AC>(
139139
entity_lfu_cache,
140140
evict_stats: _,
141141
} = entity_cache
142-
.as_modifications(block_number.compat())
142+
.as_modifications(block_number.compat(), &cx.metrics.stopwatch)
143143
.await
144144
.map_err(Error::from)
145145
.map_err(|e| e.context("failed to extract entity modifications from the state"))?;

core/src/subgraph/runner/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ where
686686
entity_lfu_cache: cache,
687687
evict_stats,
688688
} = entity_cache
689-
.as_modifications(block_ptr.number)
689+
.as_modifications(block_ptr.number, &self.metrics.host.stopwatch)
690690
.await
691691
.classify()?;
692692
section.end();
@@ -1571,7 +1571,7 @@ where
15711571
mods.extend(
15721572
block_state
15731573
.entity_cache
1574-
.as_modifications(block.number())
1574+
.as_modifications(block.number(), &self.metrics.subgraph.stopwatch)
15751575
.await?
15761576
.modifications,
15771577
);

graph/src/components/store/entity_cache.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone;
88
use crate::components::store::write::EntityModification;
99
use crate::components::store::{self as s, Entity, EntityOperation};
1010
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
11-
use crate::prelude::{CacheWeight, ENV_VARS};
11+
use crate::prelude::{CacheWeight, StopwatchMetrics, ENV_VARS};
1212
use crate::schema::{EntityKey, InputSchema};
1313
use crate::util::intern::Error as InternError;
1414
use crate::util::lfu_cache::{EvictStats, LfuCache};
@@ -474,6 +474,7 @@ impl EntityCache {
474474
pub async fn as_modifications(
475475
mut self,
476476
block: BlockNumber,
477+
stopwatch: &StopwatchMetrics,
477478
) -> Result<ModificationsAndCache, StoreError> {
478479
assert!(!self.in_handler);
479480

@@ -491,10 +492,14 @@ impl EntityCache {
491492
// is wrong and the store already has a version of the entity from a
492493
// previous block, the attempt to insert will trigger a constraint
493494
// violation in the database, ensuring correctness
494-
let missing = missing.filter(|key| !key.entity_type.is_immutable());
495+
{
496+
let _section = stopwatch.start_section("as_modifications_load");
495497

496-
for (entity_key, entity) in self.store.get_many(missing.cloned().collect()).await? {
497-
self.current.insert(entity_key, Some(Arc::new(entity)));
498+
let missing = missing.filter(|key| !key.entity_type.is_immutable());
499+
500+
for (entity_key, entity) in self.store.get_many(missing.cloned().collect()).await? {
501+
self.current.insert(entity_key, Some(Arc::new(entity)));
502+
}
498503
}
499504

500505
let mut mods = Vec::new();

runtime/test/src/test.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use graph_runtime_wasm::{
2020
use semver::Version;
2121
use std::collections::{BTreeMap, HashMap};
2222
use std::str::FromStr;
23-
use test_store::{LOGGER, STORE};
23+
use test_store::{LOGGER, STOPWATCH, STORE, SUBGRAPH_STORE};
2424
use wasmtime::{AsContext, AsContextMut};
2525

2626
use crate::common::{mock_context, mock_data_source};
@@ -44,20 +44,20 @@ fn subgraph_id_with_api_version(subgraph_id: &str, api_version: Version) -> Stri
4444
)
4545
}
4646

47-
async fn test_valid_module_and_store(
47+
async fn test_module_and_deployment(
4848
subgraph_id: &str,
4949
data_source: DataSource,
5050
api_version: Version,
51-
) -> (WasmInstance, Arc<impl SubgraphStore>, DeploymentLocator) {
52-
test_valid_module_and_store_with_timeout(subgraph_id, data_source, api_version, None).await
51+
) -> (WasmInstance, DeploymentLocator) {
52+
test_module_and_deployment_with_timeout(subgraph_id, data_source, api_version, None).await
5353
}
5454

55-
async fn test_valid_module_and_store_with_timeout(
55+
async fn test_module_and_deployment_with_timeout(
5656
subgraph_id: &str,
5757
data_source: DataSource,
5858
api_version: Version,
5959
timeout: Option<Duration>,
60-
) -> (WasmInstance, Arc<impl SubgraphStore>, DeploymentLocator) {
60+
) -> (WasmInstance, DeploymentLocator) {
6161
let logger = Logger::root(slog::Discard, o!());
6262
let subgraph_id_with_api_version =
6363
subgraph_id_with_api_version(subgraph_id, api_version.clone());
@@ -80,7 +80,7 @@ async fn test_valid_module_and_store_with_timeout(
8080
}",
8181
)
8282
.await;
83-
let stopwatch_metrics = StopwatchMetrics::new(
83+
let stopwatch = StopwatchMetrics::new(
8484
logger.clone(),
8585
deployment_id.clone(),
8686
"test",
@@ -93,7 +93,7 @@ async fn test_valid_module_and_store_with_timeout(
9393
let host_metrics = Arc::new(HostMetrics::new(
9494
metrics_registry,
9595
deployment_id.as_str(),
96-
stopwatch_metrics,
96+
stopwatch.cheap_clone(),
9797
gas_metrics,
9898
));
9999

@@ -115,15 +115,15 @@ async fn test_valid_module_and_store_with_timeout(
115115
.await
116116
.unwrap();
117117

118-
(module, store.subgraph_store(), deployment)
118+
(module, deployment)
119119
}
120120

121121
pub async fn test_module(
122122
subgraph_id: &str,
123123
data_source: DataSource,
124124
api_version: Version,
125125
) -> WasmInstance {
126-
test_valid_module_and_store(subgraph_id, data_source, api_version)
126+
test_module_and_deployment(subgraph_id, data_source, api_version)
127127
.await
128128
.0
129129
}
@@ -135,9 +135,7 @@ pub async fn test_module_latest(subgraph_id: &str, wasm_file: &str) -> WasmInsta
135135
&wasm_file_path(wasm_file, API_VERSION_0_0_5),
136136
version.clone(),
137137
);
138-
test_valid_module_and_store(subgraph_id, ds, version)
139-
.await
140-
.0
138+
test_module_and_deployment(subgraph_id, ds, version).await.0
141139
}
142140

143141
pub trait SyncWasmTy: wasmtime::WasmTy + Sync {}
@@ -529,7 +527,7 @@ async fn run_ipfs_map(
529527
.to_owned()
530528
};
531529

532-
let (mut instance, _, _) = test_valid_module_and_store(
530+
let mut instance = test_module(
533531
subgraph_id,
534532
mock_data_source(
535533
&wasm_file_path("ipfs_map.wasm", api_version.clone()),
@@ -557,7 +555,7 @@ async fn run_ipfs_map(
557555
.take_ctx()
558556
.take_state()
559557
.entity_cache
560-
.as_modifications(0)
558+
.as_modifications(0, &STOPWATCH)
561559
.await?
562560
.modifications;
563561

@@ -1008,7 +1006,8 @@ async fn ens_name_by_hash_v0_0_5() {
10081006
}
10091007

10101008
async fn test_entity_store(api_version: Version) {
1011-
let (mut instance, store, deployment) = test_valid_module_and_store(
1009+
let store = SUBGRAPH_STORE.clone();
1010+
let (mut instance, deployment) = test_module_and_deployment(
10121011
"entityStore",
10131012
mock_data_source(
10141013
&wasm_file_path("store.wasm", api_version.clone()),
@@ -1073,7 +1072,11 @@ async fn test_entity_store(api_version: Version) {
10731072
&mut ctx.ctx.state.entity_cache,
10741073
EntityCache::new(Arc::new(writable.clone())),
10751074
);
1076-
let mut mods = cache.as_modifications(0).await.unwrap().modifications;
1075+
let mut mods = cache
1076+
.as_modifications(0, &STOPWATCH)
1077+
.await
1078+
.unwrap()
1079+
.modifications;
10771080
assert_eq!(1, mods.len());
10781081
match mods.pop().unwrap() {
10791082
EntityModification::Overwrite { data, .. } => {
@@ -1093,7 +1096,7 @@ async fn test_entity_store(api_version: Version) {
10931096
.take_ctx()
10941097
.take_state()
10951098
.entity_cache
1096-
.as_modifications(0)
1099+
.as_modifications(0, &STOPWATCH)
10971100
.await
10981101
.unwrap()
10991102
.modifications;
@@ -1626,7 +1629,7 @@ async fn generate_id() {
16261629

16271630
let entity_cache = host.ctx.state.entity_cache;
16281631
let mods = entity_cache
1629-
.as_modifications(12)
1632+
.as_modifications(12, &STOPWATCH)
16301633
.await
16311634
.unwrap()
16321635
.modifications;

runtime/test/src/test/abi.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use super::*;
77

88
async fn test_unbounded_loop(api_version: Version) {
99
// Set handler timeout to 3 seconds.
10-
let mut instance = test_valid_module_and_store_with_timeout(
10+
let mut instance = test_module_and_deployment_with_timeout(
1111
"unboundedLoop",
1212
mock_data_source(
1313
&wasm_file_path("non_terminating.wasm", api_version.clone()),

store/test-store/src/store.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ lazy_static! {
6868
pub static ref NODE_ID: NodeId = NodeId::new("test").unwrap();
6969
pub static ref SUBGRAPH_STORE: Arc<DieselSubgraphStore> = STORE.subgraph_store();
7070
static ref BLOCK_STORE: DieselBlockStore = STORE.block_store();
71+
pub static ref STOPWATCH: StopwatchMetrics = StopwatchMetrics::new(
72+
Logger::root(slog::Discard, o!()),
73+
DeploymentHash::new("test").unwrap(),
74+
"test",
75+
METRICS_REGISTRY.clone(),
76+
"dummy".to_string(),
77+
);
7178
pub static ref GENESIS_PTR: BlockPtr = (
7279
B256::from(hex!(
7380
"bd34884280958002c51d3f7b5f853e6febeba33de0f40d15b0363006533c924f"
@@ -362,13 +369,6 @@ pub async fn transact_entities_and_dynamic_data_sources(
362369
Arc::new(manifest_idx_and_name),
363370
))?;
364371

365-
let mut entity_cache = EntityCache::new(Arc::new(store.clone()));
366-
entity_cache.append(ops);
367-
let mods = entity_cache
368-
.as_modifications(block_ptr_to.number)
369-
.await
370-
.expect("failed to convert to modifications")
371-
.modifications;
372372
let metrics_registry = Arc::new(MetricsRegistry::mock());
373373
let stopwatch_metrics = StopwatchMetrics::new(
374374
Logger::root(slog::Discard, o!()),
@@ -377,6 +377,14 @@ pub async fn transact_entities_and_dynamic_data_sources(
377377
metrics_registry.clone(),
378378
store.shard().to_string(),
379379
);
380+
381+
let mut entity_cache = EntityCache::new(Arc::new(store.clone()));
382+
entity_cache.append(ops);
383+
let mods = entity_cache
384+
.as_modifications(block_ptr_to.number, &stopwatch_metrics)
385+
.await
386+
.expect("failed to convert to modifications")
387+
.modifications;
380388
let block_time = BlockTime::for_test(&block_ptr_to);
381389
store
382390
.transact_block_operations(

store/test-store/tests/graph/entity_cache.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ fn sort_by_entity_key(mut mods: Vec<EntityModification>) -> Vec<EntityModificati
195195
async fn empty_cache_modifications() {
196196
let store = Arc::new(MockStore::new(BTreeMap::new()));
197197
let cache = EntityCache::new(store);
198-
let result = cache.as_modifications(0).await;
198+
let result = cache.as_modifications(0, &STOPWATCH).await;
199199
assert_eq!(result.unwrap().modifications, vec![]);
200200
}
201201

@@ -225,7 +225,7 @@ async fn insert_modifications() {
225225
mogwai_data.set_vid(100).unwrap();
226226
sigurros_data.set_vid(101).unwrap();
227227

228-
let result = cache.as_modifications(0).await;
228+
let result = cache.as_modifications(0, &STOPWATCH).await;
229229
assert_eq!(
230230
sort_by_entity_key(result.unwrap().modifications),
231231
sort_by_entity_key(vec![
@@ -276,7 +276,7 @@ async fn overwrite_modifications() {
276276
mogwai_data.set_vid(100).unwrap();
277277
sigurros_data.set_vid(101).unwrap();
278278

279-
let result = cache.as_modifications(0).await;
279+
let result = cache.as_modifications(0, &STOPWATCH).await;
280280
assert_eq!(
281281
sort_by_entity_key(result.unwrap().modifications),
282282
sort_by_entity_key(vec![
@@ -316,7 +316,7 @@ async fn consecutive_modifications() {
316316

317317
// We expect a single overwrite modification for the above that leaves "id"
318318
// and "name" untouched, sets "founded" and removes the "label" field.
319-
let result = cache.as_modifications(0).await;
319+
let result = cache.as_modifications(0, &STOPWATCH).await;
320320
assert_eq!(
321321
sort_by_entity_key(result.unwrap().modifications),
322322
sort_by_entity_key(vec![EntityModification::overwrite(
@@ -344,7 +344,7 @@ async fn check_vid_sequence() {
344344
.unwrap();
345345
}
346346

347-
let result = cache.as_modifications(0).await;
347+
let result = cache.as_modifications(0, &STOPWATCH).await;
348348
let mods = result.unwrap().modifications;
349349
for m in mods {
350350
match m {
@@ -386,7 +386,7 @@ async fn offchain_trigger_vid_collision_without_fix() {
386386
.set(band1_key.clone(), band1_data, block, None)
387387
.await
388388
.unwrap();
389-
let result1 = cache1.as_modifications(block).await.unwrap();
389+
let result1 = cache1.as_modifications(block, &STOPWATCH).await.unwrap();
390390

391391
// Simulate second offchain trigger: another fresh EntityCache (vid_seq ALSO starts at 100)
392392
let store2 = Arc::new(MockStore::new(BTreeMap::new()));
@@ -397,7 +397,7 @@ async fn offchain_trigger_vid_collision_without_fix() {
397397
.set(band2_key.clone(), band2_data, block, None)
398398
.await
399399
.unwrap();
400-
let result2 = cache2.as_modifications(block).await.unwrap();
400+
let result2 = cache2.as_modifications(block, &STOPWATCH).await.unwrap();
401401

402402
// Extract VIDs from both modifications
403403
let vid1 = match &result1.modifications[0] {
@@ -445,7 +445,7 @@ async fn offchain_trigger_vid_no_collision_with_fix() {
445445

446446
// THE FIX: capture vid_seq BEFORE as_modifications consumes the cache
447447
let next_vid_seq = cache1.vid_seq;
448-
let result1 = cache1.as_modifications(block).await.unwrap();
448+
let result1 = cache1.as_modifications(block, &STOPWATCH).await.unwrap();
449449

450450
// Second offchain trigger: set vid_seq to where first trigger left off
451451
let store2 = Arc::new(MockStore::new(BTreeMap::new()));
@@ -457,7 +457,7 @@ async fn offchain_trigger_vid_no_collision_with_fix() {
457457
.set(band2_key.clone(), band2_data, block, None)
458458
.await
459459
.unwrap();
460-
let result2 = cache2.as_modifications(block).await.unwrap();
460+
let result2 = cache2.as_modifications(block, &STOPWATCH).await.unwrap();
461461

462462
let vid1 = match &result1.modifications[0] {
463463
EntityModification::Insert { data, .. } => data.vid(),

store/test-store/tests/postgres/aggregation.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use graph::{
2626
};
2727
use graph_store_postgres::Store as DieselStore;
2828
use test_store::{
29-
create_test_subgraph, remove_subgraphs, run_test_sequentially, BLOCKS, LOGGER, METRICS_REGISTRY,
29+
create_test_subgraph, remove_subgraphs, run_test_sequentially, BLOCKS, LOGGER,
30+
METRICS_REGISTRY, STOPWATCH,
3031
};
3132

3233
const SCHEMA: &str = r#"
@@ -110,7 +111,7 @@ pub async fn insert_entities(
110111
let mut entity_cache = EntityCache::new(Arc::new(store.clone()));
111112
entity_cache.append(ops);
112113
let mods = entity_cache
113-
.as_modifications(block_ptr_to.number)
114+
.as_modifications(block_ptr_to.number, &STOPWATCH)
114115
.await
115116
.expect("failed to convert to modifications")
116117
.modifications;

0 commit comments

Comments
 (0)