Skip to content

Commit c210dd1

Browse files
bingyanglinmuXxer
andcommitted
feat(gRPC): add GetEpoch implementation in gRPC (#9233)
# Description of change - Implemented`GetEpoch` in the gRPC API. - Verified the `get_epoch` simtest by using `cargo simtest --package iota-e2e-tests --test grpc -- get_epoch` ## Links to any relevant issues Part of #8688 ## How the change has been tested - [x] Basic tests (linting, compilation, formatting, unit/integration tests) - [ ] Patch-specific tests (correctness, functionality coverage) - [x] I have added tests that prove my fix is effective or that my feature works - [ ] I have checked that new and existing unit tests pass locally with my changes --------- Co-authored-by: muXxer <git@muxxer.de>
1 parent 8e06699 commit c210dd1

File tree

25 files changed

+786
-66
lines changed

25 files changed

+786
-66
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iota-core/src/checkpoints/checkpoint_executor/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ mod data_ingestion_handler;
6161
pub mod metrics;
6262
pub(crate) mod utils;
6363

64-
#[cfg(test)]
65-
pub(crate) mod tests;
64+
// TODO: re-enable or remove these tests https://github.com/iotaledger/iota/issues/9257
65+
//#[cfg(test)]
66+
// pub(crate) mod tests;
6667

6768
use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
6869
use metrics::CheckpointExecutorMetrics;

crates/iota-core/src/rest_index.rs

Lines changed: 110 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use iota_types::{
2323
BackingPackageStore, DynamicFieldIndexInfo, DynamicFieldKey, EpochInfo, TransactionInfo,
2424
error::Error as StorageError,
2525
},
26-
transaction::{TransactionDataAPI, TransactionKind},
2726
};
2827
use move_core_types::language_storage::StructTag;
2928
use rayon::iter::{IntoParallelIterator, ParallelIterator};
@@ -134,8 +133,8 @@ struct IndexStoreTables {
134133

135134
/// An index of extra metadata for Epochs.
136135
///
137-
/// Only contains entries for transactions which have yet to be pruned from
138-
/// the main database.
136+
/// Only contains entries for epochs which have yet to be pruned from the
137+
/// main database.
139138
epochs: DBMap<EpochId, EpochInfo>,
140139

141140
/// An index of extra metadata for Transactions.
@@ -229,6 +228,8 @@ impl IndexStoreTables {
229228
self.index_existing_transactions(authority_store, checkpoint_store, checkpoint_range)?;
230229
}
231230

231+
self.initialize_current_epoch(authority_store, checkpoint_store)?;
232+
232233
let coin_index = Mutex::new(HashMap::new());
233234

234235
let make_live_object_indexer = RestParLiveObjectSetIndexer {
@@ -353,59 +354,130 @@ impl IndexStoreTables {
353354
checkpoint: &CheckpointData,
354355
batch: &mut typed_store::rocks::DBBatch,
355356
) -> Result<(), StorageError> {
356-
let CheckpointData {
357-
checkpoint_summary,
358-
transactions,
359-
..
360-
} = checkpoint;
361-
362-
let Some(_end_of_epoch) = checkpoint_summary.end_of_epoch_data.as_ref() else {
357+
let Some(epoch_info) = checkpoint.epoch_info()? else {
363358
return Ok(());
364359
};
365360

366-
let Some(transaction) = transactions.iter().find(|tx| {
367-
matches!(
368-
tx.transaction.intent_message().value.kind(),
369-
TransactionKind::EndOfEpochTransaction(_)
370-
)
371-
}) else {
372-
return Err(StorageError::custom(format!(
373-
"Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData",
374-
checkpoint_summary.sequence_number,
375-
)));
361+
// We need to handle closing the previous epoch by updating the entry for it, if
362+
// it exists.
363+
if epoch_info.epoch > 0 {
364+
let prev_epoch = epoch_info.epoch - 1;
365+
366+
if let Some(mut previous_epoch) = self.epochs.get(&prev_epoch)? {
367+
previous_epoch.end_timestamp_ms = Some(epoch_info.start_timestamp_ms);
368+
previous_epoch.end_checkpoint = Some(epoch_info.start_checkpoint - 1);
369+
batch.insert_batch(&self.epochs, [(prev_epoch, previous_epoch)])?;
370+
}
371+
}
372+
373+
// Insert the current epoch info
374+
batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
375+
376+
Ok(())
377+
}
378+
379+
// After attempting to reindex past epochs, ensure that the current epoch is at
380+
// least partially initialized
381+
fn initialize_current_epoch(
382+
&mut self,
383+
authority_store: &AuthorityStore,
384+
checkpoint_store: &CheckpointStore,
385+
) -> Result<(), StorageError> {
386+
let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
387+
return Ok(());
376388
};
377389

378-
// We need to handle closing out the current epoch by updating the entry for
379-
// this epoch in case it exists.
380-
if let Some(mut current_epoch) = self.epochs.get(&checkpoint_summary.epoch)? {
381-
current_epoch.end_timestamp_ms = Some(checkpoint_summary.timestamp_ms);
382-
current_epoch.end_checkpoint = Some(checkpoint_summary.sequence_number);
383-
batch.insert_batch(&self.epochs, [(current_epoch.epoch, current_epoch)])?;
390+
if self.epochs.get(&checkpoint.epoch)?.is_some() {
391+
// no need to initialize if it already exists
392+
return Ok(());
384393
}
385394

386-
let system_state = iota_types::iota_system_state::get_iota_system_state(
387-
&transaction.output_objects.as_slice(),
388-
)
389-
.map_err(|e| {
390-
StorageError::custom(format!(
391-
"Failed to find system state object output from end of epoch transaction: {e}"
392-
))
393-
})?;
394-
let next_epoch = EpochInfo {
395-
epoch: system_state.epoch(),
395+
let system_state = iota_types::iota_system_state::get_iota_system_state(authority_store)
396+
.map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
397+
398+
// Determine the start checkpoint of the current epoch
399+
let start_checkpoint = if checkpoint.epoch != 0 {
400+
let previous_epoch = checkpoint.epoch - 1;
401+
402+
// Find the last checkpoint of the previous epoch
403+
if let Some(previous_epoch_info) = self.epochs.get(&previous_epoch)? {
404+
if let Some(end_checkpoint) = previous_epoch_info.end_checkpoint {
405+
end_checkpoint + 1
406+
} else {
407+
// Fall back to scanning checkpoints if the end_checkpoint is None
408+
self.scan_for_epoch_start_checkpoint(
409+
checkpoint_store,
410+
checkpoint.sequence_number,
411+
previous_epoch,
412+
)?
413+
}
414+
} else {
415+
// Fall back to scanning checkpoints if the previous epoch info is missing
416+
self.scan_for_epoch_start_checkpoint(
417+
checkpoint_store,
418+
checkpoint.sequence_number,
419+
previous_epoch,
420+
)?
421+
}
422+
} else {
423+
// First epoch starts at checkpoint 0
424+
0
425+
};
426+
427+
let epoch_info = EpochInfo {
428+
epoch: checkpoint.epoch,
396429
protocol_version: system_state.protocol_version(),
397430
start_timestamp_ms: system_state.epoch_start_timestamp_ms(),
398431
end_timestamp_ms: None,
399-
start_checkpoint: checkpoint_summary.sequence_number + 1,
432+
start_checkpoint,
400433
end_checkpoint: None,
401434
reference_gas_price: system_state.reference_gas_price(),
402435
system_state,
403436
};
404-
batch.insert_batch(&self.epochs, [(next_epoch.epoch, next_epoch)])?;
437+
438+
self.epochs.insert(&epoch_info.epoch, &epoch_info)?;
405439

406440
Ok(())
407441
}
408442

443+
fn scan_for_epoch_start_checkpoint(
444+
&self,
445+
checkpoint_store: &CheckpointStore,
446+
current_checkpoint_seq_number: u64,
447+
previous_epoch: EpochId,
448+
) -> Result<u64, StorageError> {
449+
// Scan from current checkpoint backwards to 0 to find the start of this epoch.
450+
let mut last_checkpoint_seq_number_of_prev_epoch = None;
451+
for seq in (0..=current_checkpoint_seq_number).rev() {
452+
let Some(chkpt) = checkpoint_store
453+
.get_checkpoint_by_sequence_number(seq)
454+
.ok()
455+
.flatten()
456+
else {
457+
// continue if there is a gap in the checkpoints
458+
continue;
459+
};
460+
461+
if chkpt.epoch < previous_epoch {
462+
// we must stop searching if we are past the previous epoch
463+
break;
464+
}
465+
466+
if chkpt.epoch == previous_epoch && chkpt.end_of_epoch_data.is_some() {
467+
// We found the checkpoint with end of epoch data for the previous epoch
468+
last_checkpoint_seq_number_of_prev_epoch = Some(chkpt.sequence_number);
469+
break;
470+
}
471+
}
472+
473+
let last_checkpoint_seq_number_of_prev_epoch = last_checkpoint_seq_number_of_prev_epoch
474+
.ok_or(StorageError::custom(format!(
475+
"Failed to get the last checkpoint of the previous epoch {previous_epoch}",
476+
)))?;
477+
478+
Ok(last_checkpoint_seq_number_of_prev_epoch + 1)
479+
}
480+
409481
fn index_transactions(
410482
&self,
411483
checkpoint: &CheckpointData,

crates/iota-e2e-tests/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ passkey-authenticator.workspace = true
3131
passkey-client.workspace = true
3232
passkey-types.workspace = true
3333
prometheus.workspace = true
34+
prost-types.workspace = true
3435
rand.workspace = true
3536
serde.workspace = true
3637
serde_json.workspace = true
@@ -44,6 +45,7 @@ iota-config.workspace = true
4445
iota-core.workspace = true
4546
iota-framework.workspace = true
4647
iota-genesis-builder.workspace = true
48+
iota-grpc-types.workspace = true
4749
iota-json-rpc.workspace = true
4850
iota-json-rpc-api.workspace = true
4951
iota-json-rpc-types.workspace = true
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// Modifications Copyright (c) 2025 IOTA Stiftung
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
mod v0;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// Modifications Copyright (c) 2025 IOTA Stiftung
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
use iota_grpc_types::v0::ledger_service::{
6+
GetEpochRequest, ledger_service_client::LedgerServiceClient,
7+
};
8+
use iota_macros::sim_test;
9+
use prost_types::FieldMask;
10+
use test_cluster::TestClusterBuilder;
11+
12+
#[sim_test]
13+
async fn get_epoch() {
14+
let test_cluster = TestClusterBuilder::new()
15+
.with_fullnode_enable_grpc_api(true)
16+
.build()
17+
.await;
18+
19+
// Wait for at least one checkpoint to be created
20+
test_cluster.wait_for_checkpoint(1, None).await;
21+
22+
let mut client = LedgerServiceClient::connect(test_cluster.grpc_url())
23+
.await
24+
.unwrap();
25+
26+
// Get current epoch (no epoch specified means current epoch)
27+
let latest_epoch_response = client
28+
.get_epoch(GetEpochRequest {
29+
epoch: None,
30+
read_mask: None,
31+
})
32+
.await
33+
.unwrap()
34+
.into_inner();
35+
36+
let latest_epoch = latest_epoch_response.epoch.unwrap();
37+
38+
// Get epoch 0
39+
let epoch_0_response = client
40+
.get_epoch(GetEpochRequest {
41+
epoch: Some(0),
42+
read_mask: None,
43+
})
44+
.await
45+
.unwrap()
46+
.into_inner();
47+
48+
let epoch_0 = epoch_0_response.epoch.unwrap();
49+
50+
assert_eq!(latest_epoch.committee, epoch_0.committee);
51+
52+
assert_eq!(epoch_0.epoch, Some(0));
53+
assert_eq!(epoch_0.first_checkpoint, Some(0));
54+
55+
// Ensure that fetching the system state for the epoch works (using field mask)
56+
let epoch_with_bcs = client
57+
.get_epoch(GetEpochRequest {
58+
epoch: None,
59+
read_mask: Some(FieldMask {
60+
paths: vec!["bcs_system_state".to_string()],
61+
}),
62+
})
63+
.await
64+
.unwrap()
65+
.into_inner()
66+
.epoch
67+
.unwrap();
68+
assert!(epoch_with_bcs.bcs_system_state.is_some());
69+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Copyright (c) 2025 IOTA Stiftung
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
mod get_epoch;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Copyright (c) 2025 IOTA Stiftung
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
mod ledger_service;

crates/iota-grpc-server/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ workspace = true
1313
# external dependencies
1414
anyhow.workspace = true
1515
async-stream = "0.3"
16+
bcs.workspace = true
1617
futures.workspace = true
18+
prost-types.workspace = true
1719
serde.workspace = true
1820
tokio = { workspace = true, features = ["rt-multi-thread"] }
1921
tokio-stream.workspace = true
@@ -26,5 +28,6 @@ iota-config.workspace = true
2628
iota-core.workspace = true
2729
iota-grpc-types.workspace = true
2830
iota-json-rpc-types.workspace = true
31+
iota-protocol-config.workspace = true
2932
iota-types.workspace = true
3033
move-core-types.workspace = true

0 commit comments

Comments
 (0)