forked from solana-labs/solana
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunified_scheduler.rs
299 lines (272 loc) · 10.9 KB
/
unified_scheduler.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
use {
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
crossbeam_channel::unbounded,
itertools::Itertools,
log::*,
solana_core::{
banking_stage::unified_scheduler::ensure_banking_stage_setup,
banking_trace::BankingTracer,
consensus::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
progress_map::{ForkProgress, ProgressMap},
},
drop_bank_service::DropBankService,
repair::cluster_slot_state_verifier::{
DuplicateConfirmedSlots, DuplicateSlotsTracker, EpochSlotsFrozenSlots,
},
replay_stage::ReplayStage,
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
},
solana_entry::entry::Entry,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::to_packet_batches,
solana_poh::poh_recorder::create_test_recorder,
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
genesis_utils::GenesisConfigInfo, installed_scheduler_pool::SchedulingContext,
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
hash::Hash, pubkey::Pubkey, signature::Signer, signer::keypair::Keypair,
system_transaction, transaction::Result,
},
solana_streamer::socket::SocketAddrSpace,
solana_timings::ExecuteTimings,
solana_unified_scheduler_logic::{SchedulingMode, Task},
solana_unified_scheduler_pool::{
DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool,
TaskHandler,
},
std::{
collections::HashMap,
sync::{atomic::Ordering, Arc, Mutex},
thread::sleep,
time::Duration,
},
};
#[test]
fn test_scheduler_waited_by_drop_bank_service() {
solana_logger::setup();
static LOCK_TO_STALL: Mutex<()> = Mutex::new(());
#[derive(Debug)]
struct StallingHandler;
impl TaskHandler for StallingHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
scheduling_context: &SchedulingContext,
task: &Task,
handler_context: &HandlerContext,
) {
info!("Stalling at StallingHandler::handle()...");
*LOCK_TO_STALL.lock().unwrap();
// Wait a bit for the replay stage to prune banks
std::thread::sleep(std::time::Duration::from_secs(3));
info!("Now entering into DefaultTaskHandler::handle()...");
DefaultTaskHandler::handle(result, timings, scheduling_context, task, handler_context);
}
}
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
// Setup bankforks with unified scheduler enabled
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let pool = pool_raw.clone();
bank_forks.write().unwrap().install_scheduler_pool(pool);
let genesis = 0;
let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
// Create bank, which is pruned later
let pruned = 2;
let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);
// Create new root bank
let root = 3;
let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
root_bank.freeze();
let root_hash = root_bank.hash();
bank_forks.write().unwrap().insert(root_bank);
let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_pubkey::new_rand(),
2,
genesis_config.hash(),
));
// Delay transaction execution to ensure transaction execution happens after termintion has
// been started
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank
.schedule_transaction_executions([(tx, 0)].into_iter())
.unwrap();
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
drop(lock_to_stall);
// Create 2 channels to check actual pruned banks
let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
let drop_bank_service = DropBankService::new(drop_bank_receiver2);
info!("calling handle_new_root()...");
// Mostly copied from: test_handle_new_root()
{
let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));
let mut progress = ProgressMap::default();
for i in genesis..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}
let mut duplicate_slots_tracker: DuplicateSlotsTracker =
vec![root - 1, root, root + 1].into_iter().collect();
let mut duplicate_confirmed_slots: DuplicateConfirmedSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, Hash::default()))
.collect();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
UnfrozenGossipVerifiedVoteHashes {
votes_per_slot: vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, HashMap::new()))
.collect(),
};
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|slot| (slot, Hash::default()))
.collect();
ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
&AbsRequestSender::default(),
None,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_tracker,
&mut duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
&drop_bank_sender1,
)
.unwrap();
}
// Receive pruned banks from the above handle_new_root
let pruned_banks = drop_bank_receiver1.recv().unwrap();
assert_eq!(
pruned_banks
.iter()
.map(|b| b.slot())
.sorted()
.collect::<Vec<_>>(),
vec![genesis, pruned]
);
info!("sending pruned banks to DropBankService...");
drop_bank_sender2.send(pruned_banks).unwrap();
info!("joining the drop bank service...");
drop((
(drop_bank_sender1, drop_bank_receiver1),
(drop_bank_sender2,),
));
drop_bank_service.join().unwrap();
info!("finally joined the drop bank service!");
// the scheduler used by the pruned_bank have been returned now.
assert_eq!(pool_raw.pooled_scheduler_count(), 1);
}
#[test]
fn test_scheduler_producing_blocks() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
// Setup bank_forks with block-producing unified scheduler enabled
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank));
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(
genesis_bank.clone(),
blockstore.clone(),
None,
Some(leader_schedule_cache),
);
let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let channels = {
let banking_tracer = BankingTracer::new_disabled();
banking_tracer.create_channels(true)
};
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
Arc::new(ClusterInfo::new(
node.info,
keypair,
SocketAddrSpace::Unspecified,
))
};
ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder);
bank_forks.write().unwrap().install_scheduler_pool(pool);
// Wait until genesis_bank reaches its tick height...
while poh_recorder.read().unwrap().bank().is_some() {
sleep(Duration::from_millis(100));
}
// Create test tx
let tx = system_transaction::transfer(
&mint_keypair,
&solana_pubkey::new_rand(),
1,
genesis_config.hash(),
);
let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1));
let tx = RuntimeTransaction::from_transaction_for_tests(tx);
// Crate tpu_bank
let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2);
let tpu_bank = bank_forks
.write()
.unwrap()
.insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank.clone_with_scheduler(), false);
let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
assert_eq!(tpu_bank.transaction_count(), 0);
// Now, send transaction
channels
.unified_sender()
.send(banking_packet_batch)
.unwrap();
// Wait until tpu_bank reaches its tick height...
while poh_recorder.read().unwrap().bank().is_some() {
sleep(Duration::from_millis(100));
}
assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
// Verify transactions are committed and poh-recorded
assert_eq!(tpu_bank.transaction_count(), 1);
assert_matches!(
signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()),
Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()]
);
// Stop things.
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}