88 } ,
99 key_store,
1010 network:: Network ,
11+ oracle:: PricePublishingMetadata ,
1112 } ,
12- crate :: agent:: {
13- market_schedule:: MarketSchedule ,
14- remote_keypair_loader:: {
15- KeypairRequest ,
16- RemoteKeypairLoader ,
17- } ,
13+ crate :: agent:: remote_keypair_loader:: {
14+ KeypairRequest ,
15+ RemoteKeypairLoader ,
1816 } ,
1917 anyhow:: {
2018 anyhow,
6866 sync:: {
6967 mpsc:: {
7068 self ,
71- error:: TryRecvError ,
7269 Sender ,
7370 } ,
7471 oneshot,
@@ -174,7 +171,9 @@ pub fn spawn_exporter(
174171 network : Network ,
175172 rpc_url : & str ,
176173 rpc_timeout : Duration ,
177- publisher_permissions_rx : watch:: Receiver < HashMap < Pubkey , HashMap < Pubkey , MarketSchedule > > > ,
174+ publisher_permissions_rx : watch:: Receiver <
175+ HashMap < Pubkey , HashMap < Pubkey , PricePublishingMetadata > > ,
176+ > ,
178177 key_store : KeyStore ,
179178 local_store_tx : Sender < store:: local:: Message > ,
180179 global_store_tx : Sender < store:: global:: Lookup > ,
@@ -262,10 +261,11 @@ pub struct Exporter {
262261 inflight_transactions_tx : Sender < Signature > ,
263262
264263 /// publisher => { permissioned_price => market hours } as read by the oracle module
265- publisher_permissions_rx : watch:: Receiver < HashMap < Pubkey , HashMap < Pubkey , MarketSchedule > > > ,
264+ publisher_permissions_rx :
265+ watch:: Receiver < HashMap < Pubkey , HashMap < Pubkey , PricePublishingMetadata > > > ,
266266
267267 /// Currently known permissioned prices of this publisher along with their market hours
268- our_prices : HashMap < Pubkey , MarketSchedule > ,
268+ our_prices : HashMap < Pubkey , PricePublishingMetadata > ,
269269
270270 /// Interval to update the dynamic price (if enabled)
271271 dynamic_compute_unit_price_update_interval : Interval ,
@@ -289,7 +289,9 @@ impl Exporter {
289289 global_store_tx : Sender < store:: global:: Lookup > ,
290290 network_state_rx : watch:: Receiver < NetworkState > ,
291291 inflight_transactions_tx : Sender < Signature > ,
292- publisher_permissions_rx : watch:: Receiver < HashMap < Pubkey , HashMap < Pubkey , MarketSchedule > > > ,
292+ publisher_permissions_rx : watch:: Receiver <
293+ HashMap < Pubkey , HashMap < Pubkey , PricePublishingMetadata > > ,
294+ > ,
293295 keypair_request_tx : mpsc:: Sender < KeypairRequest > ,
294296 logger : Logger ,
295297 ) -> Self {
@@ -432,22 +434,30 @@ impl Exporter {
432434 async fn get_permissioned_updates ( & mut self ) -> Result < Vec < ( PriceIdentifier , PriceInfo ) > > {
433435 let local_store_contents = self . fetch_local_store_contents ( ) . await ?;
434436
435- let now = Utc :: now ( ) . timestamp ( ) ;
437+ let publish_keypair = self . get_publish_keypair ( ) . await ?;
438+ self . update_our_prices ( & publish_keypair. pubkey ( ) ) ;
439+
440+ let now = Utc :: now ( ) . naive_utc ( ) ;
441+
442+ debug ! ( self . logger, "Exporter: filtering prices permissioned to us" ;
443+ "our_prices" => format!( "{:?}" , self . our_prices. keys( ) ) ,
444+ "publish_pubkey" => publish_keypair. pubkey( ) . to_string( ) ,
445+ ) ;
436446
437447 // Filter the contents to only include information we haven't already sent,
438448 // and to ignore stale information.
439- let fresh_updates = local_store_contents
449+ Ok ( local_store_contents
440450 . into_iter ( )
441451 . filter ( |( _identifier, info) | {
442452 // Filter out timestamps that are old
443- ( now - info. timestamp ) < self . config . staleness_threshold . as_secs ( ) as i64
453+ now < info. timestamp + self . config . staleness_threshold
444454 } )
445455 . filter ( |( identifier, info) | {
446456 // Filter out unchanged price data if the max delay wasn't reached
447457
448458 if let Some ( last_info) = self . last_published_state . get ( identifier) {
449- if info. timestamp . saturating_sub ( last_info . timestamp )
450- > self . config . unchanged_publish_threshold . as_secs ( ) as i64
459+ if info. timestamp
460+ > last_info . timestamp + self . config . unchanged_publish_threshold
451461 {
452462 true // max delay since last published state reached, we publish anyway
453463 } else {
@@ -457,33 +467,17 @@ impl Exporter {
457467 true // No prior data found, letting the price through
458468 }
459469 } )
460- . collect :: < Vec < _ > > ( ) ;
461-
462- let publish_keypair = self . get_publish_keypair ( ) . await ?;
463-
464- self . update_our_prices ( & publish_keypair. pubkey ( ) ) ;
465-
466- debug ! ( self . logger, "Exporter: filtering prices permissioned to us" ;
467- "our_prices" => format!( "{:?}" , self . our_prices. keys( ) ) ,
468- "publish_pubkey" => publish_keypair. pubkey( ) . to_string( ) ,
469- ) ;
470-
471- // Get a fresh system time
472- let now = Utc :: now ( ) ;
473-
474- // Filter out price accounts we're not permissioned to update
475- Ok ( fresh_updates
476- . into_iter ( )
477470 . filter ( |( id, _data) | {
478471 let key_from_id = Pubkey :: from ( ( * id) . clone ( ) . to_bytes ( ) ) ;
479- if let Some ( schedule) = self . our_prices . get ( & key_from_id) {
480- let ret = schedule. can_publish_at ( & now) ;
472+ if let Some ( publisher_permission) = self . our_prices . get ( & key_from_id) {
473+ let now_utc = Utc :: now ( ) ;
474+ let ret = publisher_permission. schedule . can_publish_at ( & now_utc) ;
481475
482476 if !ret {
483477 debug ! ( self . logger, "Exporter: Attempted to publish price outside market hours" ;
484478 "price_account" => key_from_id. to_string( ) ,
485- "schedule" => format!( "{:?}" , schedule) ,
486- "utc_time" => now . format( "%c" ) . to_string( ) ,
479+ "schedule" => format!( "{:?}" , publisher_permission . schedule) ,
480+ "utc_time" => now_utc . format( "%c" ) . to_string( ) ,
487481 ) ;
488482 }
489483
@@ -501,6 +495,33 @@ impl Exporter {
501495 false
502496 }
503497 } )
498+ . filter ( |( id, info) | {
499+ // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval
500+ let last_info = match self . last_published_state . get ( id) {
501+ Some ( last_info) => last_info,
502+ None => {
503+ // No prior data found, letting the price through
504+ return true ;
505+ }
506+ } ;
507+
508+ let key_from_id = Pubkey :: from ( ( * id) . clone ( ) . to_bytes ( ) ) ;
509+ let publisher_metadata = match self . our_prices . get ( & key_from_id) {
510+ Some ( metadata) => metadata,
511+ None => {
512+ // Should never happen since we have filtered out the price above
513+ return false ;
514+ }
515+ } ;
516+
517+ if let Some ( publish_interval) = publisher_metadata. publish_interval {
518+ if info. timestamp < last_info. timestamp + publish_interval {
519+ // Updating the price too soon after the last update, skipping
520+ return false ;
521+ }
522+ }
523+ true
524+ } )
504525 . collect :: < Vec < _ > > ( ) )
505526 }
506527
@@ -623,9 +644,9 @@ impl Exporter {
623644 let network_state = * self . network_state_rx . borrow ( ) ;
624645 for ( identifier, price_info_result) in refreshed_batch {
625646 let price_info = price_info_result?;
647+ let now = Utc :: now ( ) . naive_utc ( ) ;
626648
627- let stale_price = ( Utc :: now ( ) . timestamp ( ) - price_info. timestamp )
628- > self . config . staleness_threshold . as_secs ( ) as i64 ;
649+ let stale_price = now > price_info. timestamp + self . config . staleness_threshold ;
629650 if stale_price {
630651 continue ;
631652 }
0 commit comments