Skip to content

Commit

Permalink
Maintain original queue capacity for unprocessed packet buffer (solan…
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Oct 28, 2022
1 parent 0a148b2 commit 22ce49a
Showing 1 changed file with 51 additions and 36 deletions.
87 changes: 51 additions & 36 deletions core/src/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,15 @@ impl ThreadLocalUnprocessedPackets {
let mut total_filter_packets_us: u64 = 0;
let mut dropped_tx_before_forwarding_count: usize = 0;

let mut original_priority_queue = self.swap_priority_queue();
let mut original_priority_queue = self.take_priority_queue();
let original_capacity = original_priority_queue.capacity();
let mut new_priority_queue = MinMaxHeap::with_capacity(original_capacity);

// indicates if `forward_buffer` still accept more packets, see details at
// `ForwardPacketBatchesByAccounts.rs`.
let mut accepting_packets = true;
// batch iterate through self.unprocessed_packet_batches in desc priority order
let retained_priority_queue: MinMaxHeap<Arc<ImmutableDeserializedPacket>> =
new_priority_queue.extend(
original_priority_queue
.drain_desc()
.chunks(batch_size)
Expand Down Expand Up @@ -522,11 +524,12 @@ impl ThreadLocalUnprocessedPackets {
);
packets_to_process
}
})
.collect();
}),
);

// replace packet priority queue
self.unprocessed_packet_batches.packet_priority_queue = retained_priority_queue;
self.unprocessed_packet_batches.packet_priority_queue = new_priority_queue;
self.verify_priority_queue(original_capacity);

inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
Expand All @@ -543,14 +546,30 @@ impl ThreadLocalUnprocessedPackets {
}

/// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place.
fn swap_priority_queue(&mut self) -> MinMaxHeap<Arc<ImmutableDeserializedPacket>> {
let capacity = self.unprocessed_packet_batches.capacity();
fn take_priority_queue(&mut self) -> MinMaxHeap<Arc<ImmutableDeserializedPacket>> {
std::mem::replace(
&mut self.unprocessed_packet_batches.packet_priority_queue,
MinMaxHeap::with_capacity(capacity),
MinMaxHeap::new(), // <-- no need to reserve capacity as we will replace this
)
}

/// Verify that the priority queue and map are consistent and that original capacity is maintained.
fn verify_priority_queue(&self, original_capacity: usize) {
// Assert unprocessed queue is still consistent and maintains original capacity
assert_eq!(
self.unprocessed_packet_batches
.packet_priority_queue
.capacity(),
original_capacity
);
assert_eq!(
self.unprocessed_packet_batches.packet_priority_queue.len(),
self.unprocessed_packet_batches
.message_hash_to_transaction
.len()
);
}

/// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding.
fn sanitize_unforwarded_packets(
&mut self,
Expand Down Expand Up @@ -701,35 +720,31 @@ impl ThreadLocalUnprocessedPackets {
where
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
{
let mut retryable_packets = self.swap_priority_queue();
let retryable_packets: MinMaxHeap<Arc<ImmutableDeserializedPacket>> = retryable_packets
.drain_desc()
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();
if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process)
{
self.collect_retained_packets(
&packets_to_process,
&retryable_transaction_indexes,
)
} else {
packets_to_process
}
})
.collect::<MinMaxHeap<_>>();

self.unprocessed_packet_batches.packet_priority_queue = retryable_packets;

// Assert unprocessed queue is still consistent
assert_eq!(
self.unprocessed_packet_batches.packet_priority_queue.len(),
self.unprocessed_packet_batches
.message_hash_to_transaction
.len()
let mut retryable_packets = self.take_priority_queue();
let original_capacity = retryable_packets.capacity();
let mut new_retryable_packets = MinMaxHeap::with_capacity(original_capacity);
new_retryable_packets.extend(
retryable_packets
.drain_desc()
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();
if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process)
{
self.collect_retained_packets(
&packets_to_process,
&retryable_transaction_indexes,
)
} else {
packets_to_process
}
}),
);

self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
self.verify_priority_queue(original_capacity);
}
}

Expand Down

0 comments on commit 22ce49a

Please sign in to comment.