Skip to content

Commit

Permalink
16/n Do not send extra Id in callback response to erlang_service
Browse files Browse the repository at this point in the history
Summary: As title

Reviewed By: michalmuskala

Differential Revision: D60582681

fbshipit-source-id: c62a62768872ae7e78b7419c8939bf8c0ecbeca9
  • Loading branch information
alanz authored and facebook-github-bot committed Aug 1, 2024
1 parent 365668d commit 07b62a7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
38 changes: 23 additions & 15 deletions crates/erlang_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,13 @@ pub struct DocResult {
pub diagnostics: Vec<DocDiagnostic>,
}

type Request = (Tag, Vec<u8>, Option<Sender<Response>>);
type Request = (Tag, Vec<u8>, RequestType);

enum RequestType {
Sender(Sender<Response>),
CallbackReply(u64), // Original Id
NoReply,
}

type Tag = &'static [u8; 3];

Expand Down Expand Up @@ -361,7 +367,7 @@ impl Connection {
) -> Response {
let (sender, receiver) = bounded::<Response>(0);
self.sender
.send((tag, request, Some(sender)))
.send((tag, request, RequestType::Sender(sender)))
.expect("failed sending request to parse server");

// Every 100ms check if the db was cancelled by calling back to db.
Expand All @@ -377,13 +383,15 @@ impl Connection {
match handle_callback(result) {
Ok(buf) => {
// Sender None means it won't update the inflight store
let request = (b"OPN", buf, None);
let request = (b"REP", buf, RequestType::CallbackReply(id));
self.sender.send(request).unwrap();
}
Err(err) => {
log::warn!("handle_callback gave err: {}, for {}", err, id);
// We must always reply, else the erlang side hangs
self.sender.send((b"OPN", Vec::new(), None)).unwrap();
self.sender
.send((b"REP", Vec::new(), RequestType::CallbackReply(id)))
.unwrap();
}
}
} else {
Expand Down Expand Up @@ -447,21 +455,16 @@ impl Connection {
reply: Response,
resolve_include: impl Fn(IncludeType, &str) -> Option<String>,
) -> Result<Vec<u8>> {
let id = reply.id;
let payload_buf = &reply.payload.get_ref();
let opens = &payload_buf[(reply.payload.position() as usize)..];
let mut buf = Vec::new();
if !opens.is_empty() {
let string_val = decode_utf8_or_latin1(opens.to_vec());
if let Some(resolved) = Self::do_resolve(&string_val, resolve_include) {
buf.write_u64::<BigEndian>(id).expect("buf write failed");
buf.write_u32::<BigEndian>(resolved.len() as u32)
.expect("buf write failed");
buf.write_all(resolved.as_bytes())
.expect("buf write failed");
} else {
// We must always send a reply, even if empty
buf.write_u64::<BigEndian>(id).expect("buf write failed");
}
} else {
log::warn!("handle_request_parse_callback: did not get OPN segment");
Expand Down Expand Up @@ -590,7 +593,7 @@ impl Connection {
.expect("buf write failed");
buf.write_all(path.as_bytes()).expect("buf write failed");
}
let request = (b"ACP", buf, None);
let request = (b"ACP", buf, RequestType::NoReply);
self.sender.send(request).unwrap();
}
}
Expand Down Expand Up @@ -678,14 +681,19 @@ fn writer_run(
receiver
.into_iter()
.try_for_each(|(tag, request, sender)| {
counter += 1;
if let Some(sender) = sender {
inflight.lock().insert(counter, sender);
}
let id = match sender {
RequestType::Sender(sender) => {
counter += 1;
inflight.lock().insert(counter, sender);
counter
}
RequestType::CallbackReply(original_id) => original_id,
RequestType::NoReply => counter,
};
let len = tag.len() + mem::size_of::<u64>() + request.len();
instream.write_u32::<BigEndian>(len.try_into().expect("message too large"))?;
instream.write_all(tag)?;
instream.write_u64::<BigEndian>(counter)?;
instream.write_u64::<BigEndian>(id)?;
instream.write_all(&request)?;
instream.flush()
})?;
Expand Down
3 changes: 2 additions & 1 deletion erlang_service/src/erlang_service_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ handle_request(<<"DCP", Id:64/big, Data/binary>>, State) ->
request(erlang_service_edoc, Id, Data, [eep48], infinity, State);
handle_request(<<"CTI", Id:64/big, Data/binary>>, State) ->
request(erlang_service_ct, Id, Data, [], 10_000, State);
handle_request(<<"OPN", _:64/big, OrigId:64/big, Data/binary>>,
%% Start of callback responses
handle_request(<<"REP", OrigId:64/big, Data/binary>>,
#{own_requests := OwnRequests} = State) ->
Path = collect_paths(Data),
case lists:keytake(OrigId, 1, OwnRequests) of
Expand Down

0 comments on commit 07b62a7

Please sign in to comment.