Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 18 additions & 33 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use payjoin::bitcoin::consensus::encode::serialize_hex;
use payjoin::bitcoin::{Amount, FeeRate};
use payjoin::persist::OptionalTransitionOutcome;
use payjoin::receive::v2::{
process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
Receiver, ReceiverBuilder, SessionHistory, UncheckedOriginalPayload, WantsFeeRange,
replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
MaybeInputsOwned, MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal,
ReceiveSession, Receiver, ReceiverBuilder, UncheckedOriginalPayload, WantsFeeRange,
WantsInputs, WantsOutputs,
};
use payjoin::send::v2::{
Expand Down Expand Up @@ -70,7 +70,7 @@ impl StatusText for ReceiveSession {
| ReceiveSession::WantsFeeRange(_)
| ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
ReceiveSession::TerminalFailure => "Session failure",
ReceiveSession::HasReplyableError(_) => "Session failure",
}
}
}
Expand Down Expand Up @@ -374,7 +374,9 @@ impl AppTrait for App {
role: Role::Receiver,
status: receiver_state,
completed_at: Some(completed_at),
error_message: session_history.terminal_error().map(|e| e.0),
error_message: session_history
.terminal_error()
.map(|e| e.to_json().to_string()),
};
recv_rows.push(row);
}
Expand Down Expand Up @@ -519,22 +521,11 @@ impl App {
self.finalize_proposal(proposal, persister).await,
ReceiveSession::PayjoinProposal(proposal) =>
self.send_payjoin_proposal(proposal, persister).await,
ReceiveSession::TerminalFailure =>
return Err(anyhow!("Terminal receiver session")),
ReceiveSession::HasReplyableError(error) =>
self.handle_error(error, persister).await,
}
};

match res {
Ok(_) => Ok(()),
Err(e) => {
let (_, session_history) = replay_receiver_event_log(persister)?;
let pj_uri = session_history.pj_uri().extras.endpoint().clone();
let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(pj_uri)).await?;
self.handle_recoverable_error(&ohttp_relay, &session_history).await?;

Err(e)
}
}
res
}

#[allow(clippy::incompatible_msrv)]
Expand Down Expand Up @@ -700,20 +691,14 @@ impl App {
Ok(ohttp_relay)
}

/// Handle request error by sending an error response over the directory
async fn handle_recoverable_error(
/// Handle error by attempting to send an error response over the directory
async fn handle_error(
&self,
ohttp_relay: &payjoin::Url,
session_history: &SessionHistory,
session: Receiver<HasReplyableError>,
persister: &ReceiverPersister,
) -> Result<()> {
Comment on lines +695 to 699
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than an Err(anyhow::Error) being propagated if post_request or process_error_response fails, are there circumstances where we want to close the session in this function? for example, if process_error_response fails? Or if the post_request fails for one of a few enumerated reasons as a follow up with the session.fail() implementation?

let e = match session_history.terminal_error() {
Some((_, Some(e))) => e,
_ => return Ok(()),
};
let (err_req, err_ctx) = session_history
.extract_err_req(ohttp_relay.as_str())?
.expect("If JsonReply is Some, then err_req and err_ctx should be Some");
let to_return = anyhow!("Replied with error: {}", e.to_json());
let (err_req, err_ctx) =
session.create_error_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())?;

let err_response = match self.post_request(err_req).await {
Ok(response) => response,
Expand All @@ -725,11 +710,11 @@ impl App {
Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
};

if let Err(e) = process_err_res(&err_bytes, err_ctx) {
if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
return Err(anyhow!("Failed to process error response: {}", e));
}

Err(to_return)
Ok(())
}

async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
Expand Down
126 changes: 84 additions & 42 deletions payjoin-ffi/src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub enum ReceiveSession {
WantsFeeRange { inner: Arc<WantsFeeRange> },
ProvisionalProposal { inner: Arc<ProvisionalProposal> },
PayjoinProposal { inner: Arc<PayjoinProposal> },
TerminalFailure,
HasReplyableError { inner: Arc<HasReplyableError> },
}

impl From<payjoin::receive::v2::ReceiveSession> for ReceiveSession {
Expand All @@ -105,7 +105,8 @@ impl From<payjoin::receive::v2::ReceiveSession> for ReceiveSession {
Self::ProvisionalProposal { inner: Arc::new(inner.into()) },
ReceiveSession::PayjoinProposal(inner) =>
Self::PayjoinProposal { inner: Arc::new(inner.into()) },
ReceiveSession::TerminalFailure => Self::TerminalFailure,
ReceiveSession::HasReplyableError(inner) =>
Self::HasReplyableError { inner: Arc::new(inner.into()) },
}
}
}
Expand Down Expand Up @@ -139,51 +140,20 @@ impl From<payjoin::receive::v2::SessionHistory> for SessionHistory {
fn from(value: payjoin::receive::v2::SessionHistory) -> Self { Self(value) }
}

#[derive(uniffi::Object)]
pub struct TerminalErr {
error: String,
reply: Option<JsonReply>,
}

#[uniffi::export]
impl TerminalErr {
pub fn error(&self) -> String { self.error.clone() }

pub fn reply(&self) -> Option<Arc<JsonReply>> { self.reply.clone().map(Arc::new) }
}

#[uniffi::export]
impl SessionHistory {
/// Receiver session Payjoin URI
pub fn pj_uri(&self) -> Arc<crate::PjUri> { Arc::new(self.0.pj_uri().into()) }

/// Terminal error from the session if present
pub fn terminal_error(&self) -> Option<Arc<TerminalErr>> {
self.0.terminal_error().map(|(error, reply)| {
Arc::new(TerminalErr { error, reply: reply.map(|reply| reply.into()) })
})
pub fn terminal_error(&self) -> Option<Arc<JsonReply>> {
self.0.terminal_error().map(|reply| Arc::new(reply.into()))
}

/// Fallback transaction from the session if present
pub fn fallback_tx(&self) -> Option<Arc<crate::Transaction>> {
self.0.fallback_tx().map(|tx| Arc::new(tx.into()))
}

/// Construct the error request to be posted on the directory if an error occurred.
/// To process the response, use [process_err_res]
pub fn extract_err_req(
&self,
ohttp_relay: String,
) -> Result<Option<RequestResponse>, SessionError> {
match self.0.extract_err_req(ohttp_relay) {
Ok(Some((request, ctx))) => Ok(Some(RequestResponse {
request: request.into(),
client_response: Arc::new(ctx.into()),
})),
Ok(None) => Ok(None),
Err(e) => Err(SessionError::from(e)),
}
}
}

#[derive(uniffi::Object)]
Expand Down Expand Up @@ -423,6 +393,7 @@ pub struct UncheckedOriginalPayloadTransition(
payjoin::receive::v2::SessionEvent,
payjoin::receive::v2::Receiver<payjoin::receive::v2::MaybeInputsOwned>,
payjoin::receive::Error,
payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
>,
>,
>,
Expand Down Expand Up @@ -484,12 +455,6 @@ impl UncheckedOriginalPayload {
}
}

/// Process an OHTTP Encapsulated HTTP POST Error response
/// to ensure it has been posted properly
#[uniffi::export]
pub fn process_err_res(body: &[u8], context: &ClientResponse) -> Result<(), SessionError> {
payjoin::receive::v2::process_err_res(body, context.into()).map_err(Into::into)
}
#[derive(Clone, uniffi::Object)]
pub struct MaybeInputsOwned(payjoin::receive::v2::Receiver<payjoin::receive::v2::MaybeInputsOwned>);

Expand All @@ -511,6 +476,7 @@ pub struct MaybeInputsOwnedTransition(
payjoin::receive::v2::SessionEvent,
payjoin::receive::v2::Receiver<payjoin::receive::v2::MaybeInputsSeen>,
payjoin::receive::Error,
payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
>,
>,
>,
Expand Down Expand Up @@ -565,6 +531,7 @@ pub struct MaybeInputsSeenTransition(
payjoin::receive::v2::SessionEvent,
payjoin::receive::v2::Receiver<payjoin::receive::v2::OutputsUnknown>,
payjoin::receive::Error,
payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
>,
>,
>,
Expand Down Expand Up @@ -617,6 +584,7 @@ pub struct OutputsUnknownTransition(
payjoin::receive::v2::SessionEvent,
payjoin::receive::v2::Receiver<payjoin::receive::v2::WantsOutputs>,
payjoin::receive::Error,
payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
>,
>,
>,
Expand Down Expand Up @@ -943,7 +911,7 @@ pub struct PayjoinProposalTransition(
payjoin::persist::MaybeSuccessTransition<
payjoin::receive::v2::SessionEvent,
(),
payjoin::receive::Error,
payjoin::receive::ProtocolError,
>,
>,
>,
Expand Down Expand Up @@ -1015,6 +983,80 @@ impl PayjoinProposal {
}
}

#[derive(Clone, uniffi::Object)]
pub struct HasReplyableError(
pub payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
);

impl From<HasReplyableError>
for payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>
{
fn from(value: HasReplyableError) -> Self { value.0 }
}

impl From<payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>>
for HasReplyableError
{
fn from(
value: payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
) -> Self {
Self(value)
}
}

#[derive(uniffi::Object)]
pub struct HasReplyableErrorTransition(
Arc<
RwLock<
Option<
payjoin::persist::MaybeSuccessTransition<
payjoin::receive::v2::SessionEvent,
(),
payjoin::receive::Error,
>,
>,
>,
>,
);

#[uniffi::export]
impl HasReplyableErrorTransition {
pub fn save(
&self,
persister: Arc<dyn JsonReceiverSessionPersister>,
) -> Result<(), ReceiverPersistedError> {
let adapter = CallbackPersisterAdapter::new(persister);
let mut inner = self.0.write().expect("Lock should not be poisoned");

let value = inner.take().expect("Already saved or moved");

value.save(&adapter).map_err(ReceiverPersistedError::from)?;
Ok(())
}
}

#[uniffi::export]
impl HasReplyableError {
pub fn create_error_request(
&self,
ohttp_relay: String,
) -> Result<RequestResponse, SessionError> {
self.0.clone().create_error_request(ohttp_relay).map_err(Into::into).map(|(req, ctx)| {
RequestResponse { request: req.into(), client_response: Arc::new(ctx.into()) }
})
}

pub fn process_error_response(
&self,
body: &[u8],
ohttp_context: &ClientResponse,
) -> PayjoinProposalTransition {
PayjoinProposalTransition(Arc::new(RwLock::new(Some(
self.0.clone().process_error_response(body, ohttp_context.into()),
))))
}
}

/// Session persister that should save and load events as JSON strings.
#[uniffi::export(with_foreign)]
pub trait JsonReceiverSessionPersister: Send + Sync {
Expand Down
Loading
Loading