@@ -73,12 +73,16 @@ pub mod store;
7373use {
7474 self :: {
7575 config:: Config ,
76- pythd:: api:: rpc,
76+ pythd:: {
77+ adapter:: notifier,
78+ api:: rpc,
79+ } ,
7780 solana:: network,
7881 } ,
7982 anyhow:: Result ,
8083 futures_util:: future:: join_all,
8184 slog:: Logger ,
85+ std:: sync:: Arc ,
8286 tokio:: sync:: {
8387 broadcast,
8488 mpsc,
@@ -113,8 +117,7 @@ impl Agent {
113117
114118 // Create the channels
115119 // TODO: make all components listen to shutdown signal
116- let ( shutdown_tx, shutdown_rx) =
117- broadcast:: channel ( self . config . channel_capacities . shutdown ) ;
120+ let ( shutdown_tx, _) = broadcast:: channel ( self . config . channel_capacities . shutdown ) ;
118121 let ( primary_oracle_updates_tx, primary_oracle_updates_rx) =
119122 mpsc:: channel ( self . config . channel_capacities . primary_oracle_updates ) ;
120123 let ( secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
@@ -123,8 +126,6 @@ impl Agent {
123126 mpsc:: channel ( self . config . channel_capacities . global_store_lookup ) ;
124127 let ( local_store_tx, local_store_rx) =
125128 mpsc:: channel ( self . config . channel_capacities . local_store ) ;
126- let ( pythd_adapter_tx, pythd_adapter_rx) =
127- mpsc:: channel ( self . config . channel_capacities . pythd_adapter ) ;
128129 let ( primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc:: channel ( 10 ) ;
129130 let ( secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc:: channel ( 10 ) ;
130131
@@ -152,34 +153,38 @@ impl Agent {
152153 ) ?) ;
153154 }
154155
156+ // Create the Pythd Adapter.
157+ let adapter = Arc :: new ( pythd:: adapter:: Adapter :: new (
158+ self . config . pythd_adapter . clone ( ) ,
159+ global_store_lookup_tx. clone ( ) ,
160+ local_store_tx. clone ( ) ,
161+ logger. clone ( ) ,
162+ ) ) ;
163+
164+ // Create the Notifier task for the Pythd RPC.
165+ jhs. push ( tokio:: spawn ( notifier (
166+ adapter. clone ( ) ,
167+ shutdown_tx. subscribe ( ) ,
168+ ) ) ) ;
169+
155170 // Spawn the Global Store
156171 jhs. push ( store:: global:: spawn_store (
157172 global_store_lookup_rx,
158173 primary_oracle_updates_rx,
159174 secondary_oracle_updates_rx,
160- pythd_adapter_tx . clone ( ) ,
175+ adapter . clone ( ) ,
161176 logger. clone ( ) ,
162177 ) ) ;
163178
164179 // Spawn the Local Store
165180 jhs. push ( store:: local:: spawn_store ( local_store_rx, logger. clone ( ) ) ) ;
166181
167- // Spawn the Pythd Adapter
168- jhs. push ( pythd:: adapter:: spawn_adapter (
169- self . config . pythd_adapter . clone ( ) ,
170- pythd_adapter_rx,
171- global_store_lookup_tx. clone ( ) ,
172- local_store_tx. clone ( ) ,
173- shutdown_tx. subscribe ( ) ,
174- logger. clone ( ) ,
175- ) ) ;
176-
177182 // Spawn the Pythd API Server
178183 jhs. push ( tokio:: spawn ( rpc:: run (
179184 self . config . pythd_api_server . clone ( ) ,
180185 logger. clone ( ) ,
181- pythd_adapter_tx ,
182- shutdown_rx ,
186+ adapter ,
187+ shutdown_tx . subscribe ( ) ,
183188 ) ) ) ;
184189
185190 // Spawn the metrics server
0 commit comments