Skip to content

Commit

Permalink
feat: improve synchronization infrastructure (#64)
Browse files Browse the repository at this point in the history
* feat: generalized ledger infrastructure and add correct pruning
* feat: add more randomness/concurrency to simulation
* feat: add iteration features for manta_util::array
* feat: add sign method to separate post into pieces
* feat: use more sharding-optimal coin selection algorithm
  • Loading branch information
bhgomes authored May 13, 2022
1 parent 57e3de1 commit d49e321
Show file tree
Hide file tree
Showing 19 changed files with 591 additions and 602 deletions.
33 changes: 25 additions & 8 deletions manta-accounting/src/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,22 +859,39 @@ macro_rules! impl_asset_map_for_maps_body {

#[inline]
fn select(&self, asset: Asset) -> Selection<Self> {
// TODO: Use a smarter coin-selection algorithm (max-heap?).
if asset.is_zero() {
return Selection::default();
}
let mut sum = Asset::zero(asset.id);
let mut values = Vec::new();
for (key, assets) in self {
for item in assets {
if item.value != AssetValue(0) && sum.try_add_assign(*item) {
values.push((key.clone(), item.value));
if sum.value >= asset.value {
break;
}
let mut min_max_asset: Option<($k, AssetValue)> = None;
let map = self
.iter()
.map(|(key, assets)| assets.iter().map(move |asset| (key, asset)))
.flatten()
.filter_map(|(key, item)| {
if !item.is_zero() && item.id == asset.id {
Some((key, item.value))
} else {
None
}
});
for (key, value) in map {
if value > asset.value {
min_max_asset = Some(match min_max_asset.take() {
Some(best) if value >= best.1 => best,
_ => (key.clone(), value),
});
} else if value == asset.value {
return Selection::new(Default::default(), vec![(key.clone(), value)]);
} else {
sum.add_assign(value);
values.push((key.clone(), value));
}
}
if let Some((best_key, best_value)) = min_max_asset {
return Selection::new(best_value - asset.value, vec![(best_key, best_value)]);
}
if sum.value < asset.value {
Selection::default()
} else {
Expand Down
168 changes: 71 additions & 97 deletions manta-accounting/src/wallet/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,136 +16,110 @@

//! Ledger Connection

use crate::transfer::{Configuration, EncryptedNote, TransferPost, Utxo, VoidNumber};
use alloc::vec::Vec;
use core::{fmt::Debug, hash::Hash};
use manta_util::future::LocalBoxFutureResult;

#[cfg(feature = "serde")]
use manta_util::serde::{Deserialize, Serialize};

/// Ledger Connection
///
/// This is the base `trait` for defining a connection with a ledger. To communicate with the
/// ledger, you can establish such a connection first and then interact via the [`Read`] and
/// [`Write`] traits which send messages along the connection.
pub trait Connection {
/// Error Type
///
/// This error type corresponds to the communication channel setup by the [`Connection`] rather
/// than any errors introduced by [`read`] or [`write`] methods. Instead those methods should
/// return errors in their `Response` types.
///
/// [`read`]: Read::read
/// [`write`]: Write::write
type Error;
}

/// Ledger Checkpoint
///
/// The checkpoint type is responsible for keeping the ledger, signer, and wallet in sync with each
/// other making sure that they all have the same view of the ledger state. Checkpoints should
/// be orderable with a bottom element returned by [`Default::default`].
pub trait Checkpoint: Default + PartialOrd {
/// Returns the index into the receiver set for the ledger.
fn receiver_index(&self) -> usize;
/// be orderable with a bottom element returned by [`Default::default`]. Types implementing this
/// `trait` must also implement [`Clone`] as it must be safe (but not necessarily efficient) to
/// copy a checkpoint value.
pub trait Checkpoint: Clone + Default + PartialOrd {}

/// Returns the index into the sender set for the ledger.
fn sender_index(&self) -> usize;
}

/// Ledger Pull Configuration
pub trait PullConfiguration<C>
/// Ledger Data Pruning
pub trait Prune<T>
where
C: Configuration,
T: Checkpoint,
{
/// Ledger State Checkpoint Type
type Checkpoint: Checkpoint;

/// Receiver Chunk Iterator Type
type ReceiverChunk: IntoIterator<Item = (Utxo<C>, EncryptedNote<C>)>;

/// Sender Chunk Iterator Type
type SenderChunk: IntoIterator<Item = VoidNumber<C>>;
/// Prunes the data in `self`, which was retrieved at `origin`, so that it meets the current
/// `checkpoint`, dropping data that is older than the given `checkpoint`. This method should
/// return `true` if it dropped data from `self`.
fn prune(&mut self, origin: &T, checkpoint: &T) -> bool;
}

/// Ledger Source Connection
pub trait Connection<C>: PullConfiguration<C>
where
C: Configuration,
{
/// Push Response Type
///
/// This is the return type of the [`push`](Self::push) method. Use this type to customize the
/// ledger's response to posting a set of transactions, valid or otherwise. In most cases `bool`
/// or some result type like `Result<(), Error>` is sufficient. In other cases where the ledger
/// cannot respond immediately to the [`push`](Self::push) command, a subscription token can be
/// returned instead which can be used to listen to the result later on.
type PushResponse;

/// Error Type
///
/// This error type corresponds to the communication channel itself setup by the [`Connection`]
/// rather than any errors introduced by the [`pull`](Self::pull) or [`push`](Self::push)
/// methods themselves which would correspond to an empty [`PullResponse`] or whatever error
/// variants are stored in [`PushResponse`](Self::PushResponse).
type Error;
/// Ledger Connection Reading
pub trait Read<D>: Connection {
/// Checkpoint Type
type Checkpoint: Checkpoint;

/// Pulls receiver data from the ledger starting from `checkpoint`, returning the current
/// [`Checkpoint`](PullConfiguration::Checkpoint).
fn pull<'s>(
/// Gets data from the ledger starting from `checkpoint`, returning the current
/// [`Checkpoint`](Self::Checkpoint).
fn read<'s>(
&'s mut self,
checkpoint: &'s Self::Checkpoint,
) -> LocalBoxFutureResult<'s, PullResponse<C, Self>, Self::Error>;

/// Sends `posts` to the ledger, returning `true` or `false` depending on whether the entire
/// batch succeeded or not.
fn push(
&mut self,
posts: Vec<TransferPost<C>>,
) -> LocalBoxFutureResult<Self::PushResponse, Self::Error>;
) -> LocalBoxFutureResult<'s, ReadResponse<Self::Checkpoint, D>, Self::Error>;
}

/// Ledger Source Pull Response
/// Ledger Connection Read Response
///
/// This `struct` is created by the [`pull`](Connection::pull) method on [`Connection`].
/// This `struct` is created by the [`read`](Read::read) method on [`Read`].
/// See its documentation for more.
#[cfg_attr(
feature = "serde",
derive(Deserialize, Serialize),
serde(
bound(
deserialize = r"
L::Checkpoint: Deserialize<'de>,
L::ReceiverChunk: Deserialize<'de>,
L::SenderChunk: Deserialize<'de>
",
serialize = r"
L::Checkpoint: Serialize,
L::ReceiverChunk: Serialize,
L::SenderChunk: Serialize
",
),
crate = "manta_util::serde",
deny_unknown_fields
)
)]
#[derive(derivative::Derivative)]
#[derivative(
Clone(bound = "L::Checkpoint: Clone, L::ReceiverChunk: Clone, L::SenderChunk: Clone"),
Copy(bound = "L::Checkpoint: Copy, L::ReceiverChunk: Copy, L::SenderChunk: Copy"),
Debug(bound = "L::Checkpoint: Debug, L::ReceiverChunk: Debug, L::SenderChunk: Debug"),
Default(bound = "L::Checkpoint: Default, L::ReceiverChunk: Default, L::SenderChunk: Default"),
Eq(bound = "L::Checkpoint: Eq, L::ReceiverChunk: Eq, L::SenderChunk: Eq"),
Hash(bound = "L::Checkpoint: Hash, L::ReceiverChunk: Hash, L::SenderChunk: Hash"),
PartialEq(
bound = "L::Checkpoint: PartialEq, L::ReceiverChunk: PartialEq, L::SenderChunk: PartialEq"
)
serde(crate = "manta_util::serde", deny_unknown_fields)
)]
pub struct PullResponse<C, L>
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct ReadResponse<T, D>
where
C: Configuration,
L: PullConfiguration<C> + ?Sized,
T: Checkpoint,
{
/// Pull Continuation Flag
/// Read Continuation Flag
///
/// The `should_continue` flag is set to `true` if the client should request more data from the
/// ledger to finish the pull.
/// ledger to finish the requested [`read`](Read::read).
pub should_continue: bool,

/// Ledger Checkpoint
/// Next Ledger Checkpoint
///
/// This checkpoint represents the new checkpoint that the client should be synchronized to when
/// incorporating the [`data`] returned by the [`read`] request. To continue the request the
/// client should send this checkpoint in their next call to [`read`].
///
/// If the `should_continue` flag is set to `true` then `checkpoint` is the next
/// [`Checkpoint`](PullConfiguration::Checkpoint) to request data from the ledger. Otherwise, it
/// represents the current ledger state.
pub checkpoint: L::Checkpoint,
/// [`data`]: Self::data
/// [`read`]: Read::read
pub next_checkpoint: T,

/// Ledger Receiver Chunk
pub receivers: L::ReceiverChunk,
/// Data Payload
///
/// This is the data payload that was returned by the ledger corresponding to the
/// [`read`](Read::read) request.
pub data: D,
}

/// Ledger Sender Chunk
pub senders: L::SenderChunk,
/// Ledger Connection Writing
pub trait Write<R>: Connection {
/// Ledger Response Type
///
/// This is the return type of the [`write`] method. Use this type to customize the ledger's
/// response to performing a [`write`] call, valid or otherwise. In most cases `bool` or some
/// result type like `Result<(), Error>` is sufficient. In other cases where the ledger cannot
/// respond immediately to the [`write`] command, a subscription token can be returned instead
/// which can be used to listen to the result later on.
type Response;

/// Sends the `request` to the ledger, returning its [`Response`](Self::Response).
fn write(&mut self, request: R) -> LocalBoxFutureResult<Self::Response, Self::Error>;
}
Loading

0 comments on commit d49e321

Please sign in to comment.