Skip to content

Commit 076a1c3

Browse files
authored
Data column sidecar event (#7587)
N/A Implement events for data column sidecar ethereum/beacon-APIs#535
1 parent 7416d06 commit 076a1c3

File tree

7 files changed

+188
-12
lines changed

7 files changed

+188
-12
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ use crate::{
7373
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
7474
BeaconSnapshot, CachedHead,
7575
};
76-
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes};
76+
use eth2::types::{
77+
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
78+
};
7779
use execution_layer::{
7880
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
7981
FailedCondition, PayloadAttributes, PayloadStatus,
@@ -3087,6 +3089,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30873089
return Err(BlockError::DuplicateFullyImported(block_root));
30883090
}
30893091

3092+
self.emit_sse_data_column_sidecar_events(
3093+
&block_root,
3094+
data_columns.iter().map(|column| column.as_data_column()),
3095+
);
3096+
30903097
let r = self
30913098
.check_gossip_data_columns_availability_and_import(
30923099
slot,
@@ -3158,10 +3165,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31583165
return Err(BlockError::DuplicateFullyImported(block_root));
31593166
}
31603167

3161-
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
3162-
// consumers don't expect the blobs event to fire erratically.
3163-
if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output {
3164-
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob()));
3168+
match &engine_get_blobs_output {
3169+
EngineGetBlobsOutput::Blobs(blobs) => {
3170+
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob()));
3171+
}
3172+
EngineGetBlobsOutput::CustodyColumns(columns) => {
3173+
self.emit_sse_data_column_sidecar_events(
3174+
&block_root,
3175+
columns.iter().map(|column| column.as_data_column()),
3176+
);
3177+
}
31653178
}
31663179

31673180
let r = self
@@ -3191,6 +3204,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31913204
}
31923205
}
31933206

3207+
fn emit_sse_data_column_sidecar_events<'a, I>(
3208+
self: &Arc<Self>,
3209+
block_root: &Hash256,
3210+
data_columns_iter: I,
3211+
) where
3212+
I: Iterator<Item = &'a DataColumnSidecar<T::EthSpec>>,
3213+
{
3214+
if let Some(event_handler) = self.event_handler.as_ref() {
3215+
if event_handler.has_data_column_sidecar_subscribers() {
3216+
let imported_data_columns = self
3217+
.data_availability_checker
3218+
.cached_data_column_indexes(block_root)
3219+
.unwrap_or_default();
3220+
let new_data_columns =
3221+
data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index));
3222+
3223+
for data_column in new_data_columns {
3224+
event_handler.register(EventKind::DataColumnSidecar(
3225+
SseDataColumnSidecar::from_data_column_sidecar(data_column),
3226+
));
3227+
}
3228+
}
3229+
}
3230+
}
3231+
31943232
/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
31953233
/// imported or errors.
31963234
pub async fn process_rpc_custody_columns(
@@ -3231,6 +3269,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32313269
}
32323270
}
32333271

3272+
self.emit_sse_data_column_sidecar_events(
3273+
&block_root,
3274+
custody_columns.iter().map(|column| column.as_ref()),
3275+
);
3276+
32343277
let r = self
32353278
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
32363279
.await;

beacon_node/beacon_chain/src/data_column_verification.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,7 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
215215
}
216216

217217
/// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY.
218-
#[cfg(test)]
219-
pub(crate) fn __new_for_testing(column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>) -> Self {
218+
pub fn __new_for_testing(column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>) -> Self {
220219
Self {
221220
block_root: column_sidecar.block_root(),
222221
data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar),
@@ -268,7 +267,6 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
268267
}
269268

270269
/// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY.
271-
#[cfg(test)]
272270
pub(crate) fn __new_for_testing(data_column: Arc<DataColumnSidecar<E>>) -> Self {
273271
Self { data: data_column }
274272
}

beacon_node/beacon_chain/src/events.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
1111
single_attestation_tx: Sender<EventKind<E>>,
1212
block_tx: Sender<EventKind<E>>,
1313
blob_sidecar_tx: Sender<EventKind<E>>,
14+
data_column_sidecar_tx: Sender<EventKind<E>>,
1415
finalized_tx: Sender<EventKind<E>>,
1516
head_tx: Sender<EventKind<E>>,
1617
exit_tx: Sender<EventKind<E>>,
@@ -37,6 +38,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
3738
let (single_attestation_tx, _) = broadcast::channel(capacity);
3839
let (block_tx, _) = broadcast::channel(capacity);
3940
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
41+
let (data_column_sidecar_tx, _) = broadcast::channel(capacity);
4042
let (finalized_tx, _) = broadcast::channel(capacity);
4143
let (head_tx, _) = broadcast::channel(capacity);
4244
let (exit_tx, _) = broadcast::channel(capacity);
@@ -57,6 +59,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
5759
single_attestation_tx,
5860
block_tx,
5961
blob_sidecar_tx,
62+
data_column_sidecar_tx,
6063
finalized_tx,
6164
head_tx,
6265
exit_tx,
@@ -99,6 +102,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
99102
.blob_sidecar_tx
100103
.send(kind)
101104
.map(|count| log_count("blob sidecar", count)),
105+
EventKind::DataColumnSidecar(_) => self
106+
.data_column_sidecar_tx
107+
.send(kind)
108+
.map(|count| log_count("data_column_sidecar", count)),
102109
EventKind::FinalizedCheckpoint(_) => self
103110
.finalized_tx
104111
.send(kind)
@@ -177,6 +184,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
177184
self.blob_sidecar_tx.subscribe()
178185
}
179186

187+
pub fn subscribe_data_column_sidecar(&self) -> Receiver<EventKind<E>> {
188+
self.data_column_sidecar_tx.subscribe()
189+
}
190+
180191
pub fn subscribe_finalized(&self) -> Receiver<EventKind<E>> {
181192
self.finalized_tx.subscribe()
182193
}
@@ -249,6 +260,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
249260
self.blob_sidecar_tx.receiver_count() > 0
250261
}
251262

263+
pub fn has_data_column_sidecar_subscribers(&self) -> bool {
264+
self.data_column_sidecar_tx.receiver_count() > 0
265+
}
266+
252267
pub fn has_finalized_subscribers(&self) -> bool {
253268
self.finalized_tx.receiver_count() > 0
254269
}

beacon_node/beacon_chain/src/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME";
7272

7373
// Pre-computed data column sidecar using a single static blob from:
7474
// `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz`
75-
const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] =
75+
pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] =
7676
include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz");
7777

7878
// Default target aggregators to set during testing, this ensures an aggregator at each slot.

beacon_node/beacon_chain/tests/events.rs

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use beacon_chain::blob_verification::GossipVerifiedBlob;
2-
use beacon_chain::test_utils::BeaconChainHarness;
3-
use eth2::types::{EventKind, SseBlobSidecar};
2+
use beacon_chain::data_column_verification::GossipVerifiedDataColumn;
3+
use beacon_chain::test_utils::{BeaconChainHarness, TEST_DATA_COLUMN_SIDECARS_SSZ};
4+
use eth2::types::{EventKind, SseBlobSidecar, SseDataColumnSidecar};
45
use rand::rngs::StdRng;
56
use rand::SeedableRng;
67
use std::sync::Arc;
78
use types::blob_sidecar::FixedBlobSidecarList;
8-
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec};
9+
use types::test_utils::TestRandom;
10+
use types::{
11+
BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList,
12+
};
913

1014
type E = MinimalEthSpec;
1115

@@ -43,6 +47,42 @@ async fn blob_sidecar_event_on_process_gossip_blob() {
4347
assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs));
4448
}
4549

50+
/// Verifies that a data column event is emitted when a gossip verified data column is received via gossip or the publish block API.
51+
#[tokio::test]
52+
async fn data_column_sidecar_event_on_process_gossip_data_column() {
53+
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
54+
let harness = BeaconChainHarness::builder(E::default())
55+
.spec(spec)
56+
.deterministic_keypairs(8)
57+
.fresh_ephemeral_store()
58+
.mock_execution_layer()
59+
.build();
60+
61+
// subscribe to blob sidecar events
62+
let event_handler = harness.chain.event_handler.as_ref().unwrap();
63+
let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar();
64+
65+
// build and process a gossip verified data column
66+
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
67+
let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng));
68+
let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar);
69+
let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(
70+
gossip_verified_data_column.as_data_column(),
71+
);
72+
73+
let _ = harness
74+
.chain
75+
.process_gossip_data_columns(vec![gossip_verified_data_column], || Ok(()))
76+
.await
77+
.unwrap();
78+
79+
let sidecar_event = data_column_event_receiver.try_recv().unwrap();
80+
assert_eq!(
81+
sidecar_event,
82+
EventKind::DataColumnSidecar(expected_sse_data_column)
83+
);
84+
}
85+
4686
/// Verifies that a blob event is emitted when blobs are received via RPC.
4787
#[tokio::test]
4888
async fn blob_sidecar_event_on_process_rpc_blobs() {
@@ -95,3 +135,41 @@ async fn blob_sidecar_event_on_process_rpc_blobs() {
95135
}
96136
assert_eq!(sse_blobs, expected_sse_blobs);
97137
}
138+
139+
#[tokio::test]
140+
async fn data_column_sidecar_event_on_process_rpc_columns() {
141+
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
142+
let harness = BeaconChainHarness::builder(E::default())
143+
.spec(spec.clone())
144+
.deterministic_keypairs(8)
145+
.fresh_ephemeral_store()
146+
.mock_execution_layer()
147+
.build();
148+
149+
// subscribe to blob sidecar events
150+
let event_handler = harness.chain.event_handler.as_ref().unwrap();
151+
let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar();
152+
153+
// load the precomputed column sidecar to avoid computing them for every block in the tests.
154+
let mut sidecar = RuntimeVariableList::<DataColumnSidecar<E>>::from_ssz_bytes(
155+
TEST_DATA_COLUMN_SIDECARS_SSZ,
156+
spec.number_of_columns as usize,
157+
)
158+
.unwrap()[0]
159+
.clone();
160+
let parent_root = harness.chain.head().head_block_root();
161+
sidecar.signed_block_header.message.parent_root = parent_root;
162+
let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(&sidecar);
163+
164+
let _ = harness
165+
.chain
166+
.process_rpc_custody_columns(vec![Arc::new(sidecar)])
167+
.await
168+
.unwrap();
169+
170+
let sidecar_event = data_column_event_receiver.try_recv().unwrap();
171+
assert_eq!(
172+
sidecar_event,
173+
EventKind::DataColumnSidecar(expected_sse_data_column)
174+
);
175+
}

beacon_node/http_api/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4741,6 +4741,9 @@ pub fn serve<T: BeaconChainTypes>(
47414741
api_types::EventTopic::BlobSidecar => {
47424742
event_handler.subscribe_blob_sidecar()
47434743
}
4744+
api_types::EventTopic::DataColumnSidecar => {
4745+
event_handler.subscribe_data_column_sidecar()
4746+
}
47444747
api_types::EventTopic::Attestation => {
47454748
event_handler.subscribe_attestation()
47464749
}

common/eth2/src/types.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,35 @@ impl SseBlobSidecar {
960960
}
961961
}
962962

963+
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
964+
pub struct SseDataColumnSidecar {
965+
pub block_root: Hash256,
966+
#[serde(with = "serde_utils::quoted_u64")]
967+
pub index: u64,
968+
pub slot: Slot,
969+
pub kzg_commitments: Vec<KzgCommitment>,
970+
pub versioned_hashes: Vec<VersionedHash>,
971+
}
972+
973+
impl SseDataColumnSidecar {
974+
pub fn from_data_column_sidecar<E: EthSpec>(
975+
data_column_sidecar: &DataColumnSidecar<E>,
976+
) -> SseDataColumnSidecar {
977+
let kzg_commitments = data_column_sidecar.kzg_commitments.to_vec();
978+
let versioned_hashes = kzg_commitments
979+
.iter()
980+
.map(|c| c.calculate_versioned_hash())
981+
.collect();
982+
SseDataColumnSidecar {
983+
block_root: data_column_sidecar.block_root(),
984+
index: data_column_sidecar.index,
985+
slot: data_column_sidecar.slot(),
986+
kzg_commitments,
987+
versioned_hashes,
988+
}
989+
}
990+
}
991+
963992
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
964993
pub struct SseFinalizedCheckpoint {
965994
pub block: Hash256,
@@ -1110,6 +1139,7 @@ pub enum EventKind<E: EthSpec> {
11101139
SingleAttestation(Box<SingleAttestation>),
11111140
Block(SseBlock),
11121141
BlobSidecar(SseBlobSidecar),
1142+
DataColumnSidecar(SseDataColumnSidecar),
11131143
FinalizedCheckpoint(SseFinalizedCheckpoint),
11141144
Head(SseHead),
11151145
VoluntaryExit(SignedVoluntaryExit),
@@ -1133,6 +1163,7 @@ impl<E: EthSpec> EventKind<E> {
11331163
EventKind::Head(_) => "head",
11341164
EventKind::Block(_) => "block",
11351165
EventKind::BlobSidecar(_) => "blob_sidecar",
1166+
EventKind::DataColumnSidecar(_) => "data_column_sidecar",
11361167
EventKind::Attestation(_) => "attestation",
11371168
EventKind::SingleAttestation(_) => "single_attestation",
11381169
EventKind::VoluntaryExit(_) => "voluntary_exit",
@@ -1168,6 +1199,11 @@ impl<E: EthSpec> EventKind<E> {
11681199
"blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err(
11691200
|e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)),
11701201
)?)),
1202+
"data_column_sidecar" => Ok(EventKind::DataColumnSidecar(
1203+
serde_json::from_str(data).map_err(|e| {
1204+
ServerError::InvalidServerSentEvent(format!("Data Column Sidecar: {:?}", e))
1205+
})?,
1206+
)),
11711207
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
11721208
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
11731209
)?)),
@@ -1257,6 +1293,7 @@ pub enum EventTopic {
12571293
Head,
12581294
Block,
12591295
BlobSidecar,
1296+
DataColumnSidecar,
12601297
Attestation,
12611298
SingleAttestation,
12621299
VoluntaryExit,
@@ -1283,6 +1320,7 @@ impl FromStr for EventTopic {
12831320
"head" => Ok(EventTopic::Head),
12841321
"block" => Ok(EventTopic::Block),
12851322
"blob_sidecar" => Ok(EventTopic::BlobSidecar),
1323+
"data_column_sidecar" => Ok(EventTopic::DataColumnSidecar),
12861324
"attestation" => Ok(EventTopic::Attestation),
12871325
"single_attestation" => Ok(EventTopic::SingleAttestation),
12881326
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
@@ -1310,6 +1348,7 @@ impl fmt::Display for EventTopic {
13101348
EventTopic::Head => write!(f, "head"),
13111349
EventTopic::Block => write!(f, "block"),
13121350
EventTopic::BlobSidecar => write!(f, "blob_sidecar"),
1351+
EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"),
13131352
EventTopic::Attestation => write!(f, "attestation"),
13141353
EventTopic::SingleAttestation => write!(f, "single_attestation"),
13151354
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),

0 commit comments

Comments
 (0)