Skip to content

Commit

Permalink
17/n make erlang_service Response an enum
Browse files Browse the repository at this point in the history
Summary: Instead of `ResponseType`, make `Response` as a whole an equivalent enum.

Reviewed By: michalmuskala

Differential Revision: D60594861

fbshipit-source-id: 01b29a61416f12cfc366c513cd2314e1eaeaed76
  • Loading branch information
alanz authored and facebook-github-bot committed Aug 2, 2024
1 parent 07b62a7 commit f36bb54
Showing 1 changed file with 64 additions and 61 deletions.
125 changes: 64 additions & 61 deletions crates/erlang_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,40 +221,46 @@ enum RequestType {
}

type Tag = &'static [u8; 3];
type Id = u64;
type Payload = Vec<u8>;

#[derive(Debug)]
// Second element is the inflight id for responding to callback requests.
struct Response {
payload: Cursor<Vec<u8>>,
id: u64,
status: ResponseStatus,
enum Response {
Callback(Payload, Id),
Ok(Payload),
Err(Payload),
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum ResponseStatus {
Ok, // 0
Error, // 1
CallBack, // 2
}

impl From<u8> for ResponseStatus {
fn from(value: u8) -> Self {
match value {
0 => ResponseStatus::Ok,
1 => ResponseStatus::Error,
2 => ResponseStatus::CallBack,
impl Response {
fn from_buf(buf: &[u8]) -> Result<(Id, Response)> {
let mut cursor = Cursor::new(buf);
let id = cursor.read_u64::<BigEndian>()?;
let status = cursor.read_u8()?;

let payload_buf = &cursor.get_ref();
let payload = payload_buf[(cursor.position() as usize)..].to_vec();
match status {
0 => Ok((id, Response::Ok(payload))),
1 => Ok((id, Response::Err(payload))),
2 => Ok((id, Response::Callback(payload, id))),
n => {
log::warn!("Got an unexpected ResponseStatus: {n}");
ResponseStatus::Error
log::warn!("Got an unexpected response status: {n}");
Ok((id, Response::Err(payload)))
}
}
}
}

impl Response {
fn payload(&self) -> &Payload {
match &self {
Response::Callback(payload, _) => payload,
Response::Ok(payload) => payload,
Response::Err(payload) => payload,
}
}

fn decode_segments(self, mut f: impl FnMut(&[u8; 3], Vec<u8>) -> Result<()>) -> Result<()> {
let mut cursor = self.payload;
if self.status == ResponseStatus::Error {
let mut cursor = Cursor::new(self.payload());
if self.is_error() {
let mut buf = String::new();
cursor
.read_to_string(&mut buf)
Expand All @@ -273,7 +279,17 @@ impl Response {
}

fn is_callback(&self) -> bool {
self.status == ResponseStatus::CallBack
match &self {
Response::Callback(_, _) => true,
_ => false,
}
}

fn is_error(&self) -> bool {
match &self {
Response::Err(_) => true,
_ => false,
}
}
}

Expand Down Expand Up @@ -363,7 +379,7 @@ impl Connection {
tag: Tag,
request: Vec<u8>,
unwind: impl Fn(),
handle_callback: impl Fn(Response) -> Result<Vec<u8>>,
handle_callback: impl Fn(&Response) -> Result<Vec<u8>>,
) -> Response {
let (sender, receiver) = bounded::<Response>(0);
self.sender
Expand All @@ -377,27 +393,25 @@ impl Connection {
// db on user edits
loop {
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(result) => {
let id = result.id;
if result.is_callback() {
match handle_callback(result) {
Ok(buf) => {
// Sender None means it won't update the inflight store
let request = (b"REP", buf, RequestType::CallbackReply(id));
self.sender.send(request).unwrap();
}
Err(err) => {
log::warn!("handle_callback gave err: {}, for {}", err, id);
// We must always reply, else the erlang side hangs
self.sender
.send((b"REP", Vec::new(), RequestType::CallbackReply(id)))
.unwrap();
}
Ok(ref result @ Response::Callback(_, id)) => {
match handle_callback(&result) {
Ok(buf) => {
// Sender None means it won't update the inflight store
let request = (b"REP", buf, RequestType::CallbackReply(id));
self.sender.send(request).unwrap();
}
Err(err) => {
log::warn!("handle_callback gave err: {}, for {}", err, id);
// We must always reply, else the erlang side hangs
self.sender
.send((b"REP", Vec::new(), RequestType::CallbackReply(id)))
.unwrap();
}
} else {
return result;
}
}
Ok(result) => {
return result;
}
Err(_) => unwind(),
}
}
Expand Down Expand Up @@ -452,11 +466,10 @@ impl Connection {

fn handle_request_parse_callback(
&self,
reply: Response,
reply: &Response,
resolve_include: impl Fn(IncludeType, &str) -> Option<String>,
) -> Result<Vec<u8>> {
let payload_buf = &reply.payload.get_ref();
let opens = &payload_buf[(reply.payload.position() as usize)..];
let opens = &reply.payload();
let mut buf = Vec::new();
if !opens.is_empty() {
let string_val = decode_utf8_or_latin1(opens.to_vec());
Expand Down Expand Up @@ -644,28 +657,18 @@ fn reader_run(
log::info!("Reader terminating");
return Ok(());
}
let mut cursor = Cursor::new(buf);
let id = cursor.read_u64::<BigEndian>()?;
let status = cursor.read_u8()?.into();
if status == ResponseStatus::CallBack {
let (id, response) = Response::from_buf(&buf)?;
if response.is_callback() {
let inflight = inflight.lock();
// Do not remove entry from inflight db
let sender = inflight.get(&id).expect("unexpected response id");
if let Err(err) = sender.send(Response {
payload: cursor,
id,
status,
}) {
if let Err(err) = sender.send(response) {
log::info!("Got response {}, but request was canceled: {}", id, err);
};
} else {
// Finished, remove entry from inflight
let sender = inflight.lock().remove(&id).expect("unexpected response id");
if let Err(err) = sender.send(Response {
payload: cursor,
id,
status,
}) {
if let Err(err) = sender.send(response) {
log::info!("Got response {}, but request was canceled: {}", id, err);
};
};
Expand Down

0 comments on commit f36bb54

Please sign in to comment.