Skip to content
This repository was archived by the owner on Apr 25, 2022. It is now read-only.

Commit 9cd0b6e

Browse files
committed
FullTransaction: add state
1 parent f861907 commit 9cd0b6e

File tree

6 files changed

+198
-51
lines changed

6 files changed

+198
-51
lines changed

src/at2.proto

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,21 @@ message GetLastSequenceReply {
3131
uint32 sequence = 1;
3232
}
3333

34-
message ProcessedTransaction {
34+
message FullTransaction {
3535
string timestamp = 1;
3636
bytes sender = 2;
3737
bytes recipient = 3;
3838
uint64 amount = 4;
39+
enum State {
40+
Pending = 0;
41+
Success = 1;
42+
Failure = 2;
43+
}
44+
State state = 5;
45+
uint32 sender_sequence = 6;
3946
}
4047

4148
message GetLatestTransactionsRequest {}
4249
message GetLatestTransactionsReply {
43-
repeated ProcessedTransaction transactions = 1;
50+
repeated FullTransaction transactions = 1;
4451
}

src/bin/client/main.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::io::{stdin, stdout};
22

3-
use at2_node::client::{self, Client};
3+
use at2_node::{
4+
client::{self, Client},
5+
TransactionState,
6+
};
47
use drop::crypto::sign;
58
use hex::FromHex;
69
use http::Uri;
@@ -134,8 +137,16 @@ async fn get_latest_transactions() -> Result<(), CommandError> {
134137
.iter()
135138
.for_each(|tx| {
136139
println!(
137-
"{}: {} send {}¤ to {}",
138-
tx.timestamp, tx.sender, tx.amount, tx.recipient,
140+
"{}: {} send {}¤ to {} ({})",
141+
tx.timestamp,
142+
tx.sender,
143+
tx.amount,
144+
tx.recipient,
145+
match tx.state {
146+
TransactionState::Pending => "pending",
147+
TransactionState::Success => "success",
148+
TransactionState::Failure => "failure",
149+
},
139150
)
140151
});
141152

src/bin/server/recent_transactions.rs

Lines changed: 95 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::collections::VecDeque;
22

3-
use at2_node::{FullTransaction, ThinTransaction};
3+
use at2_node::{FullTransaction, ThinTransaction, TransactionState};
44
use drop::crypto::sign;
5+
use snafu::ensure;
56
use tokio::sync::{mpsc, oneshot};
67

78
const LATEST_TRANSACTIONS_MAX_SIZE: usize = 10;
@@ -12,13 +13,23 @@ pub enum Error {
1213
GoneOnSend,
1314
#[snafu(display("gone on recv"))]
1415
GoneOnRecv,
16+
PutAlreadyExisting,
1517
}
1618

19+
type Result<T> = std::result::Result<T, Error>;
20+
1721
#[derive(Debug)]
1822
enum Commands {
1923
Put {
24+
sender: Box<sign::PublicKey>,
25+
sender_sequence: sieve::Sequence,
2026
thin: ThinTransaction,
27+
resp: oneshot::Sender<Result<()>>,
28+
},
29+
Update {
2130
sender: Box<sign::PublicKey>,
31+
sender_sequence: sieve::Sequence,
32+
state: TransactionState,
2233
resp: oneshot::Sender<()>,
2334
},
2435
GetAll {
@@ -46,24 +57,48 @@ impl RecentTransactions {
4657
pub async fn put(
4758
&self,
4859
sender: Box<sign::PublicKey>,
60+
sender_sequence: sieve::Sequence,
4961
thin: ThinTransaction,
50-
) -> Result<(), Error> {
62+
) -> Result<()> {
5163
let (tx, rx) = oneshot::channel();
5264

5365
self.agent
5466
.send(Commands::Put {
5567
sender,
68+
sender_sequence,
5669
thin,
5770
resp: tx,
5871
})
5972
.await
6073
.map_err(|_| Error::GoneOnSend)?;
6174

75+
rx.await.map_err(|_| Error::GoneOnRecv)?
76+
}
77+
78+
/// Update an already put transaction, to resolve its state
79+
pub async fn update(
80+
&self,
81+
sender: Box<sign::PublicKey>,
82+
sender_sequence: sieve::Sequence,
83+
state: TransactionState,
84+
) -> Result<()> {
85+
let (tx, rx) = oneshot::channel();
86+
87+
self.agent
88+
.send(Commands::Update {
89+
sender,
90+
sender_sequence,
91+
state,
92+
resp: tx,
93+
})
94+
.await
95+
.map_err(|_| Error::GoneOnSend)?;
96+
6297
rx.await.map_err(|_| Error::GoneOnRecv)
6398
}
6499

65100
/// Return the recently seen transactions
66-
pub async fn get_all(&self) -> Result<Vec<FullTransaction>, Error> {
101+
pub async fn get_all(&self) -> Result<Vec<FullTransaction>> {
67102
let (tx, rx) = oneshot::channel();
68103

69104
self.agent
@@ -86,8 +121,21 @@ impl RecentTransactionsHandler {
86121
tokio::spawn(async move {
87122
while let Some(cmd) = rx.recv().await {
88123
match cmd {
89-
Commands::Put { sender, thin, resp } => {
90-
self.put(*sender, thin);
124+
Commands::Put {
125+
sender,
126+
sender_sequence,
127+
thin,
128+
resp,
129+
} => {
130+
let _ = resp.send(self.put(*sender, sender_sequence, thin));
131+
}
132+
Commands::Update {
133+
sender,
134+
sender_sequence,
135+
state,
136+
resp,
137+
} => {
138+
self.update(*sender, sender_sequence, state);
91139
let _ = resp.send(());
92140
}
93141
Commands::GetAll { resp } => {
@@ -100,12 +148,52 @@ impl RecentTransactionsHandler {
100148
tx
101149
}
102150

103-
fn put(&mut self, sender: sign::PublicKey, thin: ThinTransaction) {
151+
fn put(
152+
&mut self,
153+
sender: sign::PublicKey,
154+
sender_sequence: sieve::Sequence,
155+
thin: ThinTransaction,
156+
) -> Result<()> {
157+
ensure!(
158+
!self
159+
.0
160+
.iter()
161+
.any(|tx| tx.sender_sequence == sender_sequence && tx.sender == sender),
162+
PutAlreadyExisting
163+
);
164+
165+
let full = FullTransaction {
166+
timestamp: chrono::Utc::now(),
167+
sender,
168+
sender_sequence,
169+
recipient: thin.recipient,
170+
amount: thin.amount,
171+
state: TransactionState::Pending,
172+
};
173+
104174
if self.0.len() == LATEST_TRANSACTIONS_MAX_SIZE {
105175
self.0.pop_front();
106176
}
107-
let full = FullTransaction::with_thin(sender, thin);
177+
108178
self.0.push_back(full);
179+
180+
Ok(())
181+
}
182+
183+
fn update(
184+
&mut self,
185+
sender: sign::PublicKey,
186+
sender_sequence: sieve::Sequence,
187+
state: TransactionState,
188+
) {
189+
// NOP if not found as the transaction may resolve late
190+
if let Some(tx) = self
191+
.0
192+
.iter_mut()
193+
.find(|tx| tx.sender_sequence == sender_sequence && tx.sender == sender)
194+
{
195+
tx.state = state;
196+
}
109197
}
110198

111199
fn get_all(&self) -> Vec<FullTransaction> {

src/bin/server/rpc.rs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::fmt;
22

3-
use at2_node::{proto, ThinTransaction};
3+
use at2_node::{
4+
proto::{self, *},
5+
ThinTransaction, TransactionState,
6+
};
47
use contagion::{Contagion, ContagionConfig, ContagionMessage};
58
use drop::{
69
crypto::key::exchange::{self, Exchanger},
@@ -168,22 +171,31 @@ impl Service {
168171

169172
let sender = Box::new(msg.sender().to_owned());
170173

171-
self.accounts
174+
let processed = self
175+
.accounts
172176
.transfer(
173177
sender.clone(),
174178
msg.sequence(),
175179
Box::new(msg.payload().recipient),
176180
msg.payload().amount,
177181
)
178182
.await
179-
.context(ProcessTxForAccounts)?;
183+
.context(ProcessTxForAccounts);
180184

181185
self.recent_transactions
182-
.put(sender, msg.payload().to_owned())
186+
.update(
187+
sender,
188+
msg.sequence(),
189+
if processed.is_ok() {
190+
TransactionState::Success
191+
} else {
192+
TransactionState::Failure
193+
},
194+
)
183195
.await
184196
.context(ProcessTxForRecent)?;
185197

186-
Ok(())
198+
processed
187199
}
188200
}
189201

@@ -204,49 +216,57 @@ impl From<recent_transactions::Error> for tonic::Status {
204216
}
205217

206218
#[tonic::async_trait]
207-
impl proto::at2_server::At2 for Service {
219+
impl at2_server::At2 for Service {
208220
async fn send_asset(
209221
&self,
210-
request: tonic::Request<proto::SendAssetRequest>,
211-
) -> Result<tonic::Response<proto::SendAssetReply>, tonic::Status> {
222+
request: tonic::Request<SendAssetRequest>,
223+
) -> Result<tonic::Response<SendAssetReply>, tonic::Status> {
212224
let message = request.into_inner();
213225

226+
let thin = at2_node::ThinTransaction {
227+
recipient: bincode::deserialize(&message.recipient).context(Deserialize)?,
228+
amount: message.amount,
229+
};
230+
231+
let sender = bincode::deserialize(&message.sender).context(Deserialize)?;
232+
233+
self.recent_transactions
234+
.put(Box::new(sender), message.sequence, thin.clone())
235+
.await?;
236+
214237
self.handle
215238
.clone()
216239
.broadcast(&sieve::Payload::new(
217-
bincode::deserialize(&message.sender).context(Deserialize)?,
240+
sender,
218241
message.sequence,
219-
at2_node::ThinTransaction {
220-
recipient: bincode::deserialize(&message.recipient).context(Deserialize)?,
221-
amount: message.amount,
222-
},
242+
thin,
223243
bincode::deserialize(&message.signature).context(Deserialize)?,
224244
))
225245
.await
226-
.expect("broadcasting failed");
246+
.map_err(|err| tonic::Status::invalid_argument(err.to_string()))?;
227247

228-
Ok(Response::new(proto::SendAssetReply {}))
248+
Ok(Response::new(SendAssetReply {}))
229249
}
230250

231251
async fn get_last_sequence(
232252
&self,
233-
request: tonic::Request<proto::GetLastSequenceRequest>,
234-
) -> Result<tonic::Response<proto::GetLastSequenceReply>, tonic::Status> {
253+
request: tonic::Request<GetLastSequenceRequest>,
254+
) -> Result<tonic::Response<GetLastSequenceReply>, tonic::Status> {
235255
let sequence = self
236256
.accounts
237257
.get_last_sequence(
238258
bincode::deserialize(&request.get_ref().sender).context(Deserialize)?,
239259
)
240260
.await?;
241261

242-
Ok(Response::new(proto::GetLastSequenceReply { sequence }))
262+
Ok(Response::new(GetLastSequenceReply { sequence }))
243263
}
244264

245265
async fn get_balance(
246266
&self,
247-
request: tonic::Request<proto::GetBalanceRequest>,
248-
) -> Result<tonic::Response<proto::GetBalanceReply>, tonic::Status> {
249-
Ok(Response::new(proto::GetBalanceReply {
267+
request: tonic::Request<GetBalanceRequest>,
268+
) -> Result<tonic::Response<GetBalanceReply>, tonic::Status> {
269+
Ok(Response::new(GetBalanceReply {
250270
amount: self
251271
.accounts
252272
.get_balance(bincode::deserialize(&request.get_ref().sender).context(Deserialize)?)
@@ -256,20 +276,28 @@ impl proto::at2_server::At2 for Service {
256276

257277
async fn get_latest_transactions(
258278
&self,
259-
_: tonic::Request<proto::GetLatestTransactionsRequest>,
260-
) -> Result<tonic::Response<proto::GetLatestTransactionsReply>, tonic::Status> {
261-
Ok(Response::new(proto::GetLatestTransactionsReply {
279+
_: tonic::Request<GetLatestTransactionsRequest>,
280+
) -> Result<tonic::Response<GetLatestTransactionsReply>, tonic::Status> {
281+
use full_transaction::State;
282+
283+
Ok(Response::new(GetLatestTransactionsReply {
262284
transactions: self
263285
.recent_transactions
264286
.get_all()
265287
.await?
266288
.iter()
267289
.map(|tx| {
268-
Ok(proto::ProcessedTransaction {
290+
Ok(proto::FullTransaction {
269291
timestamp: tx.timestamp.to_rfc3339(),
270292
sender: bincode::serialize(&tx.sender).context(Serialize)?,
293+
sender_sequence: tx.sender_sequence,
271294
recipient: bincode::serialize(&tx.recipient).context(Serialize)?,
272295
amount: tx.amount,
296+
state: match tx.state {
297+
TransactionState::Pending => State::Pending as i32,
298+
TransactionState::Success => State::Success as i32,
299+
TransactionState::Failure => State::Failure as i32,
300+
},
273301
})
274302
})
275303
.collect::<Result<_, ProtoError>>()?,

0 commit comments

Comments
 (0)