Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ impl App {
self.finalize_proposal(proposal, persister).await,
ReceiveSession::PayjoinProposal(proposal) =>
self.send_payjoin_proposal(proposal, persister).await,
ReceiveSession::Uninitialized =>
return Err(anyhow!("Uninitialized receiver session")),
ReceiveSession::TerminalFailure =>
return Err(anyhow!("Terminal receiver session")),
}
Expand Down
4 changes: 3 additions & 1 deletion payjoin-ffi/src/receive/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,6 @@ pub struct PsbtInputError(#[from] receive::PsbtInputError);
/// Error that may occur when a receiver event log is replayed
#[derive(Debug, thiserror::Error, uniffi::Object)]
#[error(transparent)]
pub struct ReceiverReplayError(#[from] receive::v2::ReplayError);
pub struct ReceiverReplayError(
#[from] payjoin::error::ReplayError<receive::v2::ReceiveSession, receive::v2::SessionEvent>,
);
2 changes: 0 additions & 2 deletions payjoin-ffi/src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ impl ReceiverSessionEvent {

#[derive(Clone, uniffi::Enum)]
pub enum ReceiveSession {
Uninitialized,
Initialized { inner: Arc<Initialized> },
UncheckedOriginalPayload { inner: Arc<UncheckedOriginalPayload> },
MaybeInputsOwned { inner: Arc<MaybeInputsOwned> },
Expand All @@ -93,7 +92,6 @@ impl From<payjoin::receive::v2::ReceiveSession> for ReceiveSession {
fn from(value: payjoin::receive::v2::ReceiveSession) -> Self {
use payjoin::receive::v2::ReceiveSession;
match value {
ReceiveSession::Uninitialized => Self::Uninitialized,
ReceiveSession::Initialized(inner) =>
Self::Initialized { inner: Arc::new(inner.into()) },
ReceiveSession::UncheckedOriginalPayload(inner) =>
Expand Down
4 changes: 3 additions & 1 deletion payjoin-ffi/src/send/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ pub struct WellKnownError(#[from] send::WellKnownError);
/// Error that may occur when the sender session event log is replayed
#[derive(Debug, thiserror::Error, uniffi::Object)]
#[error(transparent)]
pub struct SenderReplayError(#[from] send::v2::ReplayError);
pub struct SenderReplayError(
#[from] payjoin::error::ReplayError<send::v2::SendSession, send::v2::SessionEvent>,
);

/// Error that may occur during state machine transitions
#[derive(Debug, thiserror::Error, uniffi::Error)]
Expand Down
2 changes: 0 additions & 2 deletions payjoin-ffi/src/send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl SenderSessionEvent {

#[derive(Clone, uniffi::Enum)]
pub enum SendSession {
Uninitialized,
WithReplyKey { inner: Arc<WithReplyKey> },
V2GetContext { inner: Arc<V2GetContext> },
ProposalReceived { inner: Arc<Psbt> },
Expand All @@ -79,7 +78,6 @@ impl From<payjoin::send::v2::SendSession> for SendSession {
fn from(value: payjoin::send::v2::SendSession) -> Self {
use payjoin::send::v2::SendSession;
match value {
SendSession::Uninitialized => Self::Uninitialized,
SendSession::WithReplyKey(inner) =>
Self::WithReplyKey { inner: Arc::new(inner.into()) },
SendSession::V2GetContext(inner) =>
Expand Down
47 changes: 46 additions & 1 deletion payjoin/src/core/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::{error, fmt};

#[derive(Debug)]
Expand All @@ -10,7 +11,7 @@ impl ImplementationError {
}

impl fmt::Display for ImplementationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.0.fmt(f) }
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { std::fmt::Display::fmt(&self.0, f) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly out of curiosity what compelled this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC there was a ambigious definition for fmt once I imported Debug.

}

impl error::Error for ImplementationError {
Expand All @@ -33,3 +34,47 @@ impl From<&str> for ImplementationError {
ImplementationError::from(error)
}
}
/// Errors that can occur when replaying a session event log
#[cfg(feature = "v2")]
#[derive(Debug)]
pub struct ReplayError<SessionState, SessionEvent>(InternalReplayError<SessionState, SessionEvent>);

#[cfg(feature = "v2")]
impl<SessionState: Debug, SessionEvent: Debug> std::fmt::Display
for ReplayError<SessionState, SessionEvent>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use InternalReplayError::*;
match &self.0 {
NoEvents => write!(f, "No events found in session"),
InvalidEvent(event, session) => match session {
Some(session) => write!(f, "Invalid event ({event:?}) for session ({session:?})",),
None => write!(f, "Invalid first event ({event:?}) for session",),
},
PersistenceFailure(e) => write!(f, "Persistence failure: {e}"),
}
}
}
#[cfg(feature = "v2")]
impl<SessionState: Debug, SessionEvent: Debug> std::error::Error
for ReplayError<SessionState, SessionEvent>
{
}

#[cfg(feature = "v2")]
impl<SessionState: Debug, SessionEvent: Debug> From<InternalReplayError<SessionState, SessionEvent>>
for ReplayError<SessionState, SessionEvent>
{
fn from(e: InternalReplayError<SessionState, SessionEvent>) -> Self { ReplayError(e) }
}

#[cfg(feature = "v2")]
#[derive(Debug)]
pub(crate) enum InternalReplayError<SessionState, SessionEvent> {
/// No events in the event log
NoEvents,
/// Invalid initial event
InvalidEvent(Box<SessionEvent>, Option<Box<SessionState>>),
/// Application storage error
PersistenceFailure(ImplementationError),
}
27 changes: 13 additions & 14 deletions payjoin/src/core/receive/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ pub(crate) use error::InternalSessionError;
pub use error::SessionError;
use serde::de::Deserializer;
use serde::{Deserialize, Serialize};
use session::InternalReplayError;
pub use session::{replay_event_log, ReplayError, SessionEvent, SessionHistory};
pub use session::{replay_event_log, SessionEvent, SessionHistory};
use url::Url;

use super::error::{Error, InputContributionError};
use super::{
common, InternalPayloadError, JsonReply, OutputSubstitutionError, ProtocolError, SelectionError,
};
use crate::error::{InternalReplayError, ReplayError};
use crate::hpke::{decrypt_message_a, encrypt_message_b, HpkeKeyPair, HpkePublicKey};
use crate::ohttp::{
ohttp_encapsulate, process_get_res, process_post_res, OhttpEncapsulationError, OhttpKeys,
Expand Down Expand Up @@ -121,14 +121,12 @@ fn short_id_from_pubkey(pubkey: &HpkePublicKey) -> ShortId {
}

/// Represents the various states of a Payjoin receiver session during the protocol flow.
/// Each variant parameterizes a `Receiver` with a specific state type, except for [`ReceiveSession::Uninitialized`] which
/// has no context yet and [`ReceiveSession::TerminalFailure`] which indicates the session has ended or is invalid.
/// Each variant parameterizes a `Receiver` with a specific state type, and [`ReceiveSession::TerminalFailure`] which indicates the session has ended or is invalid.
///
/// This provides type erasure for the receive session state, allowing for the session to be replayed
/// and the state to be updated with the next event over a uniform interface.
#[derive(Debug, Clone, PartialEq)]
pub enum ReceiveSession {
Uninitialized,
Initialized(Receiver<Initialized>),
UncheckedOriginalPayload(Receiver<UncheckedOriginalPayload>),
MaybeInputsOwned(Receiver<MaybeInputsOwned>),
Expand All @@ -143,14 +141,15 @@ pub enum ReceiveSession {
}

impl ReceiveSession {
fn process_event(self, event: SessionEvent) -> Result<ReceiveSession, ReplayError> {
match (self, event) {
(ReceiveSession::Uninitialized, SessionEvent::Created(context)) =>
Ok(ReceiveSession::Initialized(Receiver {
state: Initialized {},
session_context: context,
})),
fn new(context: SessionContext) -> Self {
ReceiveSession::Initialized(Receiver { state: Initialized {}, session_context: context })
}

fn process_event(
self,
event: SessionEvent,
) -> Result<ReceiveSession, ReplayError<Self, SessionEvent>> {
match (self, event) {
(
ReceiveSession::Initialized(state),
SessionEvent::UncheckedOriginalPayload { original: proposal, reply_key },
Expand Down Expand Up @@ -185,9 +184,9 @@ impl ReceiveSession {
) => Ok(state.apply_payjoin_proposal(payjoin_proposal)),

(_, SessionEvent::SessionInvalid(_, _)) => Ok(ReceiveSession::TerminalFailure),
(current_state, event) => Err(InternalReplayError::InvalidStateAndEvent(
Box::new(current_state),
(current_state, event) => Err(InternalReplayError::InvalidEvent(
Box::new(event),
Some(Box::new(current_state)),
)
.into()),
}
Expand Down
48 changes: 13 additions & 35 deletions payjoin/src/core/receive/v2/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,34 @@ use std::time::SystemTime;
use serde::{Deserialize, Serialize};

use super::{ReceiveSession, SessionContext};
use crate::error::{InternalReplayError, ReplayError};
use crate::output_substitution::OutputSubstitution;
use crate::persist::SessionPersister;
use crate::receive::v2::{extract_err_req, InternalSessionError, SessionError};
use crate::receive::{common, JsonReply, OriginalPayload, PsbtContext};
use crate::{ImplementationError, IntoUrl, PjUri, Request};

/// Errors that can occur when replaying a receiver event log
#[derive(Debug)]
pub struct ReplayError(InternalReplayError);

impl std::fmt::Display for ReplayError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use InternalReplayError::*;
match &self.0 {
InvalidStateAndEvent(state, event) => write!(
f,
"Invalid combination of state ({state:?}) and event ({event:?}) during replay",
),
PersistenceFailure(e) => write!(f, "Persistence failure: {e}"),
}
}
}
impl std::error::Error for ReplayError {}

impl From<InternalReplayError> for ReplayError {
fn from(e: InternalReplayError) -> Self { ReplayError(e) }
}

#[derive(Debug)]
pub(crate) enum InternalReplayError {
/// Invalid combination of state and event
InvalidStateAndEvent(Box<ReceiveSession>, Box<SessionEvent>),
/// Application storage error
PersistenceFailure(ImplementationError),
}

/// Replay a receiver event log to get the receiver in its current state [ReceiveSession]
/// and a session history [SessionHistory]
pub fn replay_event_log<P>(persister: &P) -> Result<(ReceiveSession, SessionHistory), ReplayError>
pub fn replay_event_log<P>(
persister: &P,
) -> Result<(ReceiveSession, SessionHistory), ReplayError<ReceiveSession, SessionEvent>>
where
P: SessionPersister,
P::SessionEvent: Into<SessionEvent> + Clone,
P::SessionEvent: From<SessionEvent>,
{
let logs = persister
let mut logs = persister
.load()
.map_err(|e| InternalReplayError::PersistenceFailure(ImplementationError::new(e)))?;
let mut receiver = ReceiveSession::Uninitialized;
let mut history = SessionHistory::default();

let mut history = SessionHistory::default();
let first_event = logs.next().ok_or(InternalReplayError::NoEvents)?.into();
history.events.push(first_event.clone());
let mut receiver = match first_event {
SessionEvent::Created(context) => ReceiveSession::new(context),
_ => return Err(InternalReplayError::InvalidEvent(Box::new(first_event), None).into()),
};
Comment on lines +27 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @nothingmuch that this should be encapsulated in a process_events function that takes the [Into?]Iterator of events (aka logs) but that can be a follow up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could probably even be shared to de-duplicate code between the sender/receiver with some generic fn. Worth it if it's more beautiful and easier to maintain symmetry but not strictly neccessary

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree on both counts (there is potential for refactoring but also since it's all private it doesn't need to be in this PR)

for event in logs {
history.events.push(event.clone().into());
receiver = receiver.process_event(event.into()).map_err(|e| {
Expand Down Expand Up @@ -211,7 +189,7 @@ pub enum SessionEvent {

#[cfg(test)]
mod tests {
use std::time::Duration;
use std::time::{Duration, SystemTime};

use payjoin_test_utils::{BoxError, EXAMPLE_URL};

Expand Down
18 changes: 10 additions & 8 deletions payjoin/src/core/send/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ pub use error::{CreateRequestError, EncapsulationError};
use error::{InternalCreateRequestError, InternalEncapsulationError};
use ohttp::ClientResponse;
use serde::{Deserialize, Serialize};
pub use session::{replay_event_log, ReplayError, SessionEvent, SessionHistory};
pub use session::{replay_event_log, SessionEvent, SessionHistory};
use url::Url;

use super::error::BuildSenderError;
use super::*;
use crate::error::{InternalReplayError, ReplayError};
use crate::hpke::{decrypt_message_b, encrypt_message_a, HpkeSecretKey};
use crate::ohttp::{ohttp_encapsulate, process_get_res, process_post_res};
use crate::persist::{
MaybeFatalTransition, MaybeSuccessTransitionWithNoResults, NextStateTransition,
};
use crate::send::v2::session::InternalReplayError;
use crate::uri::v2::PjParam;
use crate::uri::ShortId;
use crate::{HpkeKeyPair, HpkePublicKey, IntoUrl, OhttpKeys, PjUri, Request};
Expand Down Expand Up @@ -211,26 +211,28 @@ impl<State> core::ops::DerefMut for Sender<State> {
/// and the state to be updated with the next event over a uniform interface.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SendSession {
Uninitialized,
WithReplyKey(Sender<WithReplyKey>),
V2GetContext(Sender<V2GetContext>),
ProposalReceived(Psbt),
TerminalFailure,
}

impl SendSession {
fn process_event(self, event: SessionEvent) -> Result<SendSession, ReplayError> {
fn new(context: WithReplyKey) -> Self { SendSession::WithReplyKey(Sender { state: context }) }

fn process_event(
self,
event: SessionEvent,
) -> Result<SendSession, ReplayError<Self, SessionEvent>> {
match (self, event) {
(SendSession::Uninitialized, SessionEvent::CreatedReplyKey(sender_with_reply_key)) =>
Ok(SendSession::WithReplyKey(Sender { state: sender_with_reply_key })),
(SendSession::WithReplyKey(state), SessionEvent::V2GetContext(v2_get_context)) =>
Ok(state.apply_v2_get_context(v2_get_context)),
(SendSession::V2GetContext(_state), SessionEvent::ProposalReceived(proposal)) =>
Ok(SendSession::ProposalReceived(proposal)),
(_, SessionEvent::SessionInvalid(_)) => Ok(SendSession::TerminalFailure),
(current_state, event) => Err(InternalReplayError::InvalidStateAndEvent(
Box::new(current_state),
(current_state, event) => Err(InternalReplayError::InvalidEvent(
Box::new(event),
Some(Box::new(current_state)),
)
.into()),
}
Expand Down
48 changes: 13 additions & 35 deletions payjoin/src/core/send/v2/session.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,33 @@
use super::WithReplyKey;
use crate::error::{InternalReplayError, ReplayError};
use crate::persist::SessionPersister;
use crate::send::v2::{SendSession, V2GetContext};
use crate::uri::v2::PjParam;
use crate::ImplementationError;
/// Errors that can occur when replaying a sender event log
#[derive(Debug)]
pub struct ReplayError(InternalReplayError);

impl std::fmt::Display for ReplayError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use InternalReplayError::*;
match &self.0 {
InvalidStateAndEvent(state, event) => write!(
f,
"Invalid combination of state ({state:?}) and event ({event:?}) during replay",
),
PersistenceFailure(e) => write!(f, "Persistence failure: {e}"),
}
}
}
impl std::error::Error for ReplayError {}

impl From<InternalReplayError> for ReplayError {
fn from(e: InternalReplayError) -> Self { ReplayError(e) }
}

#[derive(Debug)]
pub(crate) enum InternalReplayError {
/// Invalid combination of state and event
InvalidStateAndEvent(Box<SendSession>, Box<SessionEvent>),
/// Application storage error
PersistenceFailure(ImplementationError),
}

pub fn replay_event_log<P>(persister: &P) -> Result<(SendSession, SessionHistory), ReplayError>
pub fn replay_event_log<P>(
persister: &P,
) -> Result<(SendSession, SessionHistory), ReplayError<SendSession, SessionEvent>>
where
P: SessionPersister + Clone,
P::SessionEvent: Into<SessionEvent> + Clone,
P::SessionEvent: From<SessionEvent>,
{
let logs = persister
let mut logs = persister
.load()
.map_err(|e| InternalReplayError::PersistenceFailure(ImplementationError::new(e)))?;

let mut sender = SendSession::Uninitialized;
let mut history = SessionHistory::default();
let first_event = logs.next().ok_or(InternalReplayError::NoEvents)?.into();
history.events.push(first_event.clone());
let mut sender = match first_event {
SessionEvent::CreatedReplyKey(reply_key) => SendSession::new(reply_key),
_ => return Err(InternalReplayError::InvalidEvent(Box::new(first_event), None).into()),
};

for log in logs {
let session_event = log.into();
history.events.push(session_event.clone());
let current_sender = std::mem::replace(&mut sender, SendSession::Uninitialized);
match current_sender.process_event(session_event) {
match sender.clone().process_event(session_event) {
Ok(next_sender) => sender = next_sender,
Err(_e) => {
persister.close().map_err(|e| {
Expand Down
Loading