@@ -18,7 +18,7 @@ use graph::prelude::{
18
18
SubgraphStore as _, BLOCK_NUMBER_MAX ,
19
19
} ;
20
20
use graph:: schema:: { EntityKey , EntityType , InputSchema } ;
21
- use graph:: slog:: { debug, info, warn} ;
21
+ use graph:: slog:: { debug, info, trace , warn} ;
22
22
use graph:: tokio:: select;
23
23
use graph:: tokio:: sync:: Notify ;
24
24
use graph:: tokio:: task:: JoinHandle ;
@@ -355,6 +355,7 @@ impl SyncStore {
355
355
356
356
fn get_derived (
357
357
& self ,
358
+ _logger : & Logger ,
358
359
key : & DerivedEntityQuery ,
359
360
block : BlockNumber ,
360
361
excluded_keys : Vec < EntityKey > ,
@@ -1222,8 +1223,15 @@ impl Queue {
1222
1223
1223
1224
fn get_derived (
1224
1225
& self ,
1226
+ logger : & Logger ,
1225
1227
derived_query : & DerivedEntityQuery ,
1226
1228
) -> Result < BTreeMap < EntityKey , Entity > , StoreError > {
1229
+ trace ! ( logger, "queue.get_derived: Starting derived entity query" ;
1230
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1231
+ "entity_field" => derived_query. entity_field. to_string( ) ,
1232
+ "value" => format!( "{:?}" , derived_query. value)
1233
+ ) ;
1234
+
1227
1235
fn is_related ( derived_query : & DerivedEntityQuery , entity : & Entity ) -> bool {
1228
1236
entity
1229
1237
. get ( & derived_query. entity_field )
@@ -1247,6 +1255,10 @@ impl Queue {
1247
1255
} )
1248
1256
}
1249
1257
1258
+ trace ! ( logger, "queue.get_derived: Starting queue fold operation" ;
1259
+ "entity_type" => derived_query. entity_type. to_string( )
1260
+ ) ;
1261
+
1250
1262
// Get entities from entries in the queue
1251
1263
let ( entities_in_queue, query_block) = BlockTracker :: fold (
1252
1264
& self . queue ,
@@ -1255,30 +1267,76 @@ impl Queue {
1255
1267
// Since we are going newest to oldest, do not clobber
1256
1268
// already existing entries in map as that would make us
1257
1269
// produce stale values
1270
+
1271
+ let before_count = map. len ( ) ;
1258
1272
for ( k, v) in effective_ops ( batch, derived_query, at) {
1259
1273
if !map. contains_key ( & k) {
1260
1274
map. insert ( k, v) ;
1261
1275
}
1262
1276
}
1277
+ let after_count = map. len ( ) ;
1278
+
1279
+ trace ! ( logger, "queue.get_derived: Processed batch" ;
1280
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1281
+ "block_number" => at,
1282
+ "new_entities_found" => ( after_count - before_count) ,
1283
+ "total_entities_in_map" => after_count
1284
+ ) ;
1285
+
1263
1286
map
1264
1287
} ,
1265
1288
) ;
1266
1289
1290
+ trace ! ( logger, "queue.get_derived: Completed queue fold operation" ;
1291
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1292
+ "entities_in_queue" => entities_in_queue. len( ) ,
1293
+ "query_block" => query_block,
1294
+
1295
+ ) ;
1296
+
1267
1297
let excluded_keys: Vec < EntityKey > = entities_in_queue. keys ( ) . cloned ( ) . collect ( ) ;
1298
+ trace ! ( logger, "queue.get_derived: Excluded keys from database query" ;
1299
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1300
+ "excluded_keys_count" => excluded_keys. len( )
1301
+ ) ;
1268
1302
1269
1303
// We filter to exclude the entities ids that we already have from the queue
1304
+ trace ! ( logger, "queue.get_derived: Querying database for derived entities" ;
1305
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1306
+ "query_block" => query_block
1307
+ ) ;
1308
+
1309
+ let db_query_start = Instant :: now ( ) ;
1270
1310
let mut items_from_database =
1271
1311
self . store
1272
- . get_derived ( derived_query, query_block, excluded_keys) ?;
1312
+ . get_derived ( logger, derived_query, query_block, excluded_keys) ?;
1313
+ let db_query_duration = db_query_start. elapsed ( ) ;
1314
+
1315
+ trace ! ( logger, "queue.get_derived: Retrieved entities from database" ;
1316
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1317
+ "items_from_database" => items_from_database. len( ) ,
1318
+ "duration_ms" => db_query_duration. as_millis( )
1319
+ ) ;
1273
1320
1274
1321
// Extend the store results with the entities from the queue.
1275
1322
// This overwrites any entitiy from the database with the same key from queue
1276
1323
let items_from_queue: BTreeMap < EntityKey , Entity > = entities_in_queue
1277
1324
. into_iter ( )
1278
1325
. filter_map ( |( key, entity) | entity. map ( |entity| ( key, entity) ) )
1279
1326
. collect ( ) ;
1327
+
1328
+ trace ! ( logger, "queue.get_derived: Filtered queue entities" ;
1329
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1330
+ "items_from_queue" => items_from_queue. len( )
1331
+ ) ;
1332
+
1280
1333
items_from_database. extend ( items_from_queue) ;
1281
1334
1335
+ trace ! ( logger, "queue.get_derived: Completed derived entity query" ;
1336
+ "entity_type" => derived_query. entity_type. to_string( ) ,
1337
+ "total_entities" => items_from_database. len( ) ,
1338
+ ) ;
1339
+
1282
1340
Ok ( items_from_database)
1283
1341
}
1284
1342
@@ -1434,11 +1492,26 @@ impl Writer {
1434
1492
1435
1493
fn get_derived (
1436
1494
& self ,
1495
+ logger : & Logger ,
1437
1496
key : & DerivedEntityQuery ,
1438
1497
) -> Result < BTreeMap < EntityKey , Entity > , StoreError > {
1439
1498
match self {
1440
- Writer :: Sync ( store) => store. get_derived ( key, BLOCK_NUMBER_MAX , vec ! [ ] ) ,
1441
- Writer :: Async { queue, .. } => queue. get_derived ( key) ,
1499
+ Writer :: Sync ( store) => {
1500
+ trace ! ( logger, "writer.get_derived: Getting derived entities using sync store" ;
1501
+ "entity_type" => key. entity_type. to_string( ) ,
1502
+ "entity_field" => key. entity_field. to_string( ) ,
1503
+ "value" => format!( "{:?}" , key. value)
1504
+ ) ;
1505
+ store. get_derived ( logger, key, BLOCK_NUMBER_MAX , vec ! [ ] )
1506
+ }
1507
+ Writer :: Async { queue, .. } => {
1508
+ trace ! ( logger, "writer.get_derived: Getting derived entities using async store" ;
1509
+ "entity_type" => key. entity_type. to_string( ) ,
1510
+ "entity_field" => key. entity_field. to_string( ) ,
1511
+ "value" => format!( "{:?}" , key. value)
1512
+ ) ;
1513
+ queue. get_derived ( logger, key)
1514
+ }
1442
1515
}
1443
1516
}
1444
1517
@@ -1562,9 +1635,10 @@ impl ReadStore for WritableStore {
1562
1635
1563
1636
fn get_derived (
1564
1637
& self ,
1638
+ logger : & Logger ,
1565
1639
key : & DerivedEntityQuery ,
1566
1640
) -> Result < BTreeMap < EntityKey , Entity > , StoreError > {
1567
- self . writer . get_derived ( key)
1641
+ self . writer . get_derived ( logger , key)
1568
1642
}
1569
1643
1570
1644
fn input_schema ( & self ) -> InputSchema {
0 commit comments