Skip to content

Commit

Permalink
Merge branch 'main' into INS-33198-redis-to-valkey-part-2
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 22, 2024
2 parents 3f31e4e + 395ca02 commit b6ad0d5
Showing 1 changed file with 57 additions and 69 deletions.
126 changes: 57 additions & 69 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,9 @@ pub(crate) struct KafkaSinkCluster {
#[derive(Debug)]
enum PendingRequestState {
/// A route has been determined for this request but it has not yet been sent.
Routed {
destination: Destination,
request: Message,
},
Routed { request: Message },
/// The request has been sent to the specified broker and we are now awaiting a response from that broker.
Sent {
destination: Destination,
/// How many responses must be received before this response is received.
/// When this is 0 the next response from the broker will be for this request.
/// This field must be manually decremented when another response for this broker comes through.
Expand All @@ -360,8 +356,6 @@ enum PendingRequestState {
/// The broker has returned a Response to this request.
/// Returning this response may be delayed until a response to an earlier request comes back from another broker.
Received {
// TODO: move this into the parent type
destination: Destination,
response: Message,
/// Some message types store the request here in case they need to resend it.
// TODO: if we ever turn the Message into a CoW type we will be able to
Expand All @@ -371,11 +365,8 @@ enum PendingRequestState {
}

impl PendingRequestState {
fn routed(broker_id: BrokerId, request: Message) -> Self {
Self::Routed {
destination: Destination::Id(broker_id),
request,
}
fn routed(request: Message) -> Self {
Self::Routed { request }
}
}

Expand All @@ -396,6 +387,9 @@ enum PendingRequestTy {

struct PendingRequest {
state: PendingRequestState,

destination: Destination,

/// Type of the request sent
ty: PendingRequestTy,
/// Combine the next N responses into a single response
Expand Down Expand Up @@ -1209,7 +1203,8 @@ The connection to the client has been closed."
let destination = random_broker_id(&self.nodes, &mut self.rng);
tracing::debug!("Routing request to random broker {}", destination.0);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -1228,7 +1223,8 @@ The connection to the client has been closed."
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -1250,7 +1246,8 @@ The connection to the client has been closed."

T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -1275,7 +1272,8 @@ The connection to the client has been closed."
let request_frame = T::get_request_frame(&mut request);
T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses,
});
Expand Down Expand Up @@ -1432,7 +1430,8 @@ The connection to the client has been closed."
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
Expand All @@ -1455,7 +1454,8 @@ The connection to the client has been closed."

fetch.topics = topics;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
Expand Down Expand Up @@ -1494,7 +1494,8 @@ The connection to the client has been closed."
fetch.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Fetch {
originally_sent_at: Instant::now(),
max_wait_ms,
Expand Down Expand Up @@ -2017,28 +2018,26 @@ The connection to the client has been closed."

let mut broker_to_routed_requests: HashMap<Destination, RoutedRequests> = HashMap::new();
for i in 0..self.pending_requests.len() {
if let PendingRequestState::Routed { destination, .. } = &self.pending_requests[i].state
{
let routed_requests = broker_to_routed_requests
.entry(*destination)
.or_insert_with(|| RoutedRequests {
requests: vec![],
already_pending: self
.pending_requests
.iter()
.filter(|pending_request| {
if let PendingRequestState::Sent {
destination: check_destination,
..
} = &pending_request.state
{
check_destination == destination
} else {
false
}
})
.count(),
});
if let PendingRequestState::Routed { .. } = &self.pending_requests[i].state {
let destination = self.pending_requests[i].destination;
let routed_requests =
broker_to_routed_requests
.entry(destination)
.or_insert_with(|| RoutedRequests {
requests: vec![],
already_pending: self
.pending_requests
.iter()
.filter(|pending_request| {
if let PendingRequestState::Sent { .. } = &pending_request.state
{
pending_request.destination == destination
} else {
false
}
})
.count(),
});

let request = match self.pending_requests[i].ty {
PendingRequestTy::Fetch { .. } => {
Expand All @@ -2056,7 +2055,6 @@ The connection to the client has been closed."
PendingRequestTy::Other => None,
};
let mut value = PendingRequestState::Sent {
destination: *destination,
index: routed_requests.requests.len() + routed_requests.already_pending,
request,
};
Expand Down Expand Up @@ -2163,19 +2161,15 @@ The connection to the client has been closed."
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent {
destination,
index,
request,
} = &mut pending_request.state
if let PendingRequestState::Sent { index, request } =
&mut pending_request.state
{
if destination == connection_destination {
if &pending_request.destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
destination: *destination,
response: response.take().unwrap(),
request: request.take(),
};
Expand Down Expand Up @@ -2242,14 +2236,10 @@ The connection to the client has been closed."

for i in 0..combine_responses {
let pending_request = &mut self.pending_requests[i];
if let PendingRequestState::Received {
destination,
request,
..
} = &mut pending_request.state
if let PendingRequestState::Received { request, .. } =
&mut pending_request.state
{
pending_request.state = PendingRequestState::Routed {
destination: *destination,
request: request.take().unwrap(),
}
} else {
Expand Down Expand Up @@ -2278,12 +2268,8 @@ The connection to the client has been closed."
} else {
let drain = self.pending_requests.drain(..combine_responses).map(|x| {
if let PendingRequest {
state:
PendingRequestState::Received {
response,
destination,
..
},
state: PendingRequestState::Received { response, .. },
destination,
..
} = x
{
Expand Down Expand Up @@ -3353,10 +3339,8 @@ The connection to the client has been closed."
"route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive"
);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::Routed {
destination: Destination::ControlConnection,
request,
},
state: PendingRequestState::Routed { request },
destination: Destination::ControlConnection,
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -3378,7 +3362,8 @@ The connection to the client has been closed."
};

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -3405,7 +3390,8 @@ The connection to the client has been closed."
);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::RoutedToGroup(group_id),
combine_responses: 1,
});
Expand All @@ -3432,7 +3418,8 @@ The connection to the client has been closed."
);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::RoutedToTransaction(transaction_id),
combine_responses: 1,
});
Expand All @@ -3452,7 +3439,8 @@ The connection to the client has been closed."
tracing::debug!("Routing FindCoordinator to random broker {}", destination.0);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty,
combine_responses: 1,
});
Expand Down

0 comments on commit b6ad0d5

Please sign in to comment.