Skip to content
This repository was archived by the owner on Apr 25, 2022. It is now read-only.

Commit 8ed60a8

Browse files
committed
proto: get_latest_transactions
1 parent 35058cf commit 8ed60a8

File tree

8 files changed

+285
-42
lines changed

8 files changed

+285
-42
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ resolver = "2"
1010
drop = { git = "https://github.com/Distributed-EPFL/drop" }
1111
sieve = { git = "https://github.com/Distributed-EPFL/sieve" }
1212
bincode = "1.3.3"
13+
chrono = { version = "0.4", features = ["serde"] }
1314
http = "0.2"
1415
prost = { version = "0.8", default-features = false }
1516
serde = { version = "1", features = ["derive"] }

src/at2.proto

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@ service AT2 {
55
rpc SendAsset (SendAssetRequest) returns (SendAssetReply);
66
rpc GetBalance (GetBalanceRequest) returns (GetBalanceReply);
77
rpc GetLastSequence (GetLastSequenceRequest) returns (GetLastSequenceReply);
8+
rpc GetLatestTransactions (GetLatestTransactionsRequest) returns (GetLatestTransactionsReply);
89
}
910

1011
message SendAssetRequest {
1112
bytes sender = 1;
1213
uint32 sequence = 2;
13-
14-
bytes receiver = 3;
14+
bytes recipient = 3;
1515
uint64 amount = 4;
16-
1716
bytes signature = 5;
1817
}
1918
message SendAssetReply {}
@@ -31,3 +30,15 @@ message GetLastSequenceRequest {
3130
message GetLastSequenceReply {
3231
uint32 sequence = 1;
3332
}
33+
34+
message ProcessedTransaction {
35+
string timestamp = 1;
36+
bytes sender = 2;
37+
bytes recipient = 3;
38+
uint64 amount = 4;
39+
}
40+
41+
message GetLatestTransactionsRequest {}
42+
message GetLatestTransactionsReply {
43+
repeated ProcessedTransaction transactions = 1;
44+
}

src/bin/client/main.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ enum Commands {
2424
},
2525
GetBalance,
2626
GetLastSequence,
27+
GetLatestTransactions,
2728
}
2829

2930
#[derive(Debug, StructOpt)]
@@ -52,6 +53,8 @@ enum CommandsError {
5253
GetBalance { source: CommandError },
5354
#[snafu(display("get last sequence: {}", source))]
5455
GetLastSequence { source: CommandError },
56+
#[snafu(display("get latest transactions: {}", source))]
57+
GetLatestTransactions { source: CommandError },
5558
}
5659

5760
fn config(cmd: CommandsConfig) -> Result<(), config::Error> {
@@ -120,6 +123,25 @@ async fn get_last_sequence() -> Result<(), CommandError> {
120123
Ok(())
121124
}
122125

126+
async fn get_latest_transactions() -> Result<(), CommandError> {
127+
let config = config::from_reader(stdin()).context(ReadConfig)?;
128+
129+
Client::new(config.rpc_address)
130+
.context(ClientError)?
131+
.get_latest_transactions()
132+
.await
133+
.context(ClientError)?
134+
.iter()
135+
.for_each(|tx| {
136+
println!(
137+
"{}: {} send {}¤ to {}",
138+
tx.timestamp, tx.sender, tx.amount, tx.recipient,
139+
)
140+
});
141+
142+
Ok(())
143+
}
144+
123145
#[tokio::main(flavor = "multi_thread")]
124146
async fn main() {
125147
let ret = match Commands::from_args() {
@@ -133,6 +155,9 @@ async fn main() {
133155
.context(SendAsset),
134156
Commands::GetBalance => get_balance().await.context(GetBalance),
135157
Commands::GetLastSequence => get_last_sequence().await.context(GetLastSequence),
158+
Commands::GetLatestTransactions => get_latest_transactions()
159+
.await
160+
.context(GetLatestTransactions),
136161
};
137162

138163
if let Err(err) = ret {

src/bin/server/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tracing_fmt::FmtSubscriber;
1111
mod accounts;
1212
mod config;
1313
mod rpc;
14+
mod recent_transactions;
1415

1516
#[derive(Debug, StructOpt)]
1617
enum Commands {
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::collections::VecDeque;
2+
3+
use at2_node::{FullTransaction, ThinTransaction};
4+
use drop::crypto::sign;
5+
use tokio::sync::{mpsc, oneshot};
6+
7+
const LATEST_TRANSACTIONS_MAX_SIZE: usize = 10;
8+
9+
#[derive(snafu::Snafu, Debug)]
10+
pub enum Error {
11+
#[snafu(display("gone on send"))]
12+
GoneOnSend,
13+
#[snafu(display("gone on recv"))]
14+
GoneOnRecv,
15+
}
16+
17+
#[derive(Debug)]
18+
enum Commands {
19+
Put {
20+
thin: ThinTransaction,
21+
sender: Box<sign::PublicKey>,
22+
resp: oneshot::Sender<()>,
23+
},
24+
GetAll {
25+
resp: oneshot::Sender<Vec<FullTransaction>>,
26+
},
27+
}
28+
29+
#[derive(Clone)]
30+
pub struct RecentTransactions {
31+
agent: mpsc::Sender<Commands>,
32+
}
33+
34+
/// Own the recent transactions
35+
struct RecentTransactionsHandler(VecDeque<FullTransaction>);
36+
37+
impl RecentTransactions {
38+
pub fn new() -> Self {
39+
Self {
40+
agent: RecentTransactionsHandler::new().spawn(),
41+
}
42+
}
43+
44+
/// Add a new transaction
45+
pub async fn put(
46+
&self,
47+
sender: Box<sign::PublicKey>,
48+
thin: ThinTransaction,
49+
) -> Result<(), Error> {
50+
let (tx, rx) = oneshot::channel();
51+
52+
self.agent
53+
.send(Commands::Put {
54+
sender,
55+
thin,
56+
resp: tx,
57+
})
58+
.await
59+
.map_err(|_| Error::GoneOnSend)?;
60+
61+
rx.await.map_err(|_| Error::GoneOnRecv)
62+
}
63+
64+
/// Return the recently seen transactions
65+
pub async fn get_all(&self) -> Result<Vec<FullTransaction>, Error> {
66+
let (tx, rx) = oneshot::channel();
67+
68+
self.agent
69+
.send(Commands::GetAll { resp: tx })
70+
.await
71+
.map_err(|_| Error::GoneOnSend)?;
72+
73+
rx.await.map_err(|_| Error::GoneOnRecv)
74+
}
75+
}
76+
77+
impl RecentTransactionsHandler {
78+
fn new() -> Self {
79+
Self(VecDeque::with_capacity(LATEST_TRANSACTIONS_MAX_SIZE))
80+
}
81+
82+
fn spawn(mut self) -> mpsc::Sender<Commands> {
83+
let (tx, mut rx) = mpsc::channel(32);
84+
85+
tokio::spawn(async move {
86+
while let Some(cmd) = rx.recv().await {
87+
match cmd {
88+
Commands::Put { sender, thin, resp } => {
89+
self.put(*sender, thin);
90+
let _ = resp.send(());
91+
}
92+
Commands::GetAll { resp } => {
93+
let _ = resp.send(self.get_all());
94+
}
95+
}
96+
}
97+
});
98+
99+
tx
100+
}
101+
102+
fn put(&mut self, sender: sign::PublicKey, thin: ThinTransaction) {
103+
if self.0.len() == LATEST_TRANSACTIONS_MAX_SIZE {
104+
self.0.pop_front();
105+
}
106+
let full = FullTransaction::with_thin(sender, thin);
107+
self.0.push_back(full);
108+
}
109+
110+
fn get_all(&self) -> Vec<FullTransaction> {
111+
self.0.clone().into()
112+
}
113+
}

0 commit comments

Comments
 (0)