Skip to content

Commit

Permalink
erlang_service: remove cursor
Browse files Browse the repository at this point in the history
Summary:
This removes the need to use `Cursor` and simplifies some other processing around the `Response` type.

When reading a request we now always expect an id + status - this makes implementation much simpler. On the Erlang side this only affects the `EXT` message where we just send some dummy data for those payloads.

Reviewed By: alanz

Differential Revision: D60514366

fbshipit-source-id: 97e82d7770e1280ab03c214dd0ca69116be1918a
  • Loading branch information
michalmuskala authored and facebook-github-bot committed Aug 8, 2024
1 parent baad626 commit bf00b02
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 70 deletions.
108 changes: 40 additions & 68 deletions crates/erlang_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

use std::io::BufReader;
use std::io::BufWriter;
use std::io::Cursor;
use std::io::Read;
use std::io::Write;
use std::mem;
Expand Down Expand Up @@ -232,49 +231,33 @@ enum Response {
}

impl Response {
fn from_buf(buf: &[u8]) -> Result<(Id, Response)> {
let mut cursor = Cursor::new(buf);
let id = cursor.read_u64::<BigEndian>()?;
let status = cursor.read_u8()?;

let payload_buf = &cursor.get_ref();
let payload = payload_buf[(cursor.position() as usize)..].to_vec();
fn new(status: u8, id: Id, payload: Vec<u8>) -> Result<Response> {
match status {
0 => Ok((id, Response::Ok(payload))),
1 => Ok((id, Response::Err(payload))),
2 => Ok((id, Response::Callback(payload, id))),
n => {
log::warn!("Got an unexpected response status: {n}");
Ok((id, Response::Err(payload)))
}
}
}

fn payload(&self) -> &Payload {
match &self {
Response::Callback(payload, _) => payload,
Response::Ok(payload) => payload,
Response::Err(payload) => payload,
0 => Ok(Response::Ok(payload)),
1 => Ok(Response::Err(payload)),
2 => Ok(Response::Callback(payload, id)),
n => Err(anyhow!("Got an unexpected response status: {n}")),
}
}

fn decode_segments(self, mut f: impl FnMut(&[u8; 3], Vec<u8>) -> Result<()>) -> Result<()> {
let mut cursor = Cursor::new(self.payload());
if self.is_error() {
let mut buf = String::new();
cursor
.read_to_string(&mut buf)
.expect("malformed error message");
Err(anyhow!("erlang service failed with: {}", buf))
} else {
let mut tag = [0; 3];
while let Ok(()) = cursor.read_exact(&mut tag) {
let size = cursor.read_u32::<BigEndian>().expect("malformed segment");
let mut buf = vec![0; size as usize];
cursor.read_exact(&mut buf).expect("malformed segment");
f(&tag, buf)?
match self {
Response::Ok(payload) => {
let mut payload = &*payload;
let mut tag = [0; 3];
while let Ok(()) = payload.read_exact(&mut tag) {
let size = payload.read_u32::<BigEndian>().expect("malformed segment");
let mut buf = vec![0; size as usize];
payload.read_exact(&mut buf).expect("malformed segment");
f(&tag, buf)?
}
Ok(())
}
Ok(())
Response::Err(payload) => {
let err = String::from_utf8_lossy(&payload);
Err(anyhow!("erlang service failed with: {}", err))
}
Response::Callback(_, _) => panic!("unhandled callback response: {:?}", &self),
}
}

Expand All @@ -284,13 +267,6 @@ impl Response {
_ => false,
}
}

fn is_error(&self) -> bool {
match &self {
Response::Err(_) => true,
_ => false,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -379,7 +355,7 @@ impl Connection {
tag: Tag,
request: Vec<u8>,
unwind: impl Fn(),
handle_callback: impl Fn(&Response) -> Result<Vec<u8>>,
handle_callback: impl Fn(Payload) -> Result<Vec<u8>>,
) -> Response {
let (sender, receiver) = bounded::<Response>(0);
self.sender
Expand All @@ -393,10 +369,9 @@ impl Connection {
// db on user edits
loop {
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(ref result @ Response::Callback(_, id)) => {
match handle_callback(&result) {
Ok(Response::Callback(payload, id)) => {
match handle_callback(payload) {
Ok(buf) => {
// Sender None means it won't update the inflight store
let request = (b"REP", buf, RequestType::CallbackReply(id));
self.sender.send(request).unwrap();
}
Expand Down Expand Up @@ -466,21 +441,20 @@ impl Connection {

fn handle_request_parse_callback(
&self,
reply: &Response,
request: Payload,
resolve_include: impl Fn(IncludeType, &str) -> Option<String>,
) -> Result<Vec<u8>> {
let opens = &reply.payload();
let mut buf = Vec::new();
if !opens.is_empty() {
let string_val = decode_utf8_or_latin1(opens.to_vec());
if !request.is_empty() {
let string_val = decode_utf8_or_latin1(request);
if let Some(resolved) = Self::do_resolve(&string_val, resolve_include) {
buf.write_u32::<BigEndian>(resolved.len() as u32)
.expect("buf write failed");
buf.write_all(resolved.as_bytes())
.expect("buf write failed");
}
} else {
log::warn!("handle_request_parse_callback: did not get OPN segment");
log::warn!("handle_request_parse_callback: empty server request");
}
Ok(buf)
}
Expand Down Expand Up @@ -651,26 +625,24 @@ fn reader_run(
) -> Result<()> {
loop {
let size = outstream.read_u32::<BigEndian>()? as usize;
let mut buf = vec![0; size];
outstream.read_exact(&mut buf)?;
if buf == b"EXT" {
let id = outstream.read_u64::<BigEndian>()?;
let status = outstream.read_u8()?;
let mut payload = vec![0; size - mem::size_of::<u64>() - mem::size_of::<u8>()];
outstream.read_exact(&mut payload)?;
if payload == b"EXT" {
log::info!("Reader terminating");
return Ok(());
}
let (id, response) = Response::from_buf(&buf)?;
if response.is_callback() {
let inflight = inflight.lock();
let response = Response::new(status, id, payload)?;
let sender = if response.is_callback() {
// Do not remove entry from inflight db
let sender = inflight.get(&id).expect("unexpected response id");
if let Err(err) = sender.send(response) {
log::info!("Got response {}, but request was canceled: {}", id, err);
};
inflight.lock().get(&id).cloned()
} else {
// Finished, remove entry from inflight
let sender = inflight.lock().remove(&id).expect("unexpected response id");
if let Err(err) = sender.send(response) {
log::info!("Got response {}, but request was canceled: {}", id, err);
};
inflight.lock().remove(&id)
};
if let Err(err) = sender.expect("unexpected callback id").send(response) {
log::info!("Got response {}, but request was canceled: {}", id, err);
};
}
}
Expand Down
3 changes: 1 addition & 2 deletions erlang_service/src/erlang_service_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ handle_info({IO, {data, Data}}, #{io := IO} = State) when is_binary(Data) ->
handle_request(Data, State);
handle_info({IO, eof}, #{io := IO} = State) ->
%% stdin closed, we're done
%% use port_command to make this a synchronous write
port_command(IO, <<"EXT">>),
reply(-1, <<"EXT">>, IO),
erlang:halt(0),
{noreply, State};
handle_info({timeout, Pid}, #{io := IO, requests := Requests} = State) ->
Expand Down

0 comments on commit bf00b02

Please sign in to comment.