Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sentry): base implementation for proposing blocks on a schedule #407

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
23 changes: 21 additions & 2 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ transforms:
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
beacon_api_eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -890,7 +891,7 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
mev_relay_proposer_payload_delivered_kafka:
type: kafka
buffer:
Expand All @@ -906,4 +907,22 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
beacon_api_eth_v3_validator_block_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.beacon_api_eth_v3_validator_block
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: beacon-api-eth-v3-validator-block
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
librdkafka_options:
message.max.bytes: "10485760" # 10MB
122 changes: 115 additions & 7 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ sources:
- "^mev-relay-.+"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
beacon_api_eth_v3_validator_block_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
auto_offset_reset: earliest
group_id: xatu-vector-kafka-clickhouse-beacon-api-eth-v3-validator-block-events
key_field: "event.id"
decoding:
codec: json
topics:
- "beacon-api-eth-v3-validator-block"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
transforms:
xatu_server_events_meta:
type: remap
Expand All @@ -137,6 +149,7 @@ transforms:
- beacon_api_eth_v1_proposer_kafka
- beacon_api_eth_v1_beacon_validators_kafka
- mev_relay_kafka
- beacon_api_eth_v3_validator_block_kafka
source: |-
.meta_client_name = .meta.client.name
.meta_client_id = .meta.client.id
Expand Down Expand Up @@ -355,6 +368,7 @@ transforms:
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
eth_v3_validator_block: .event.name == "BEACON_API_ETH_V3_VALIDATOR_BLOCK"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -396,6 +410,7 @@ transforms:
- xatu_server_events_router.mempool_transaction_v2
- xatu_server_events_router.mev_relay_bid_trace_builder_block_submission
- xatu_server_events_router.mev_relay_proposer_payload_delivered
- xatu_server_events_router.eth_v3_validator_block
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -1645,14 +1660,14 @@ transforms:
.error_description = "failed to parse block epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.position_in_block = .meta.client.additional_data.position_in_block
.block_root = .meta.client.additional_data.block.root
.validators = .data.validator_indexes
.committee_index = .data.data.index
.beacon_block_root = .data.data.beacon_block_root
.slot = .data.data.slot

slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
Expand Down Expand Up @@ -1777,7 +1792,7 @@ transforms:

.slot = .data.slot
.block_number = .data.block_number

slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
Expand All @@ -1787,7 +1802,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number

epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
Expand Down Expand Up @@ -1838,7 +1853,7 @@ transforms:

del(.meta_consensus_implementation)
del(.meta_network_id)

del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -1869,7 +1884,7 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number

epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
Expand Down Expand Up @@ -1918,11 +1933,83 @@ transforms:

del(.meta_consensus_implementation)
del(.meta_network_id)



del(.event)
del(.meta)
del(.data)
beacon_api_eth_v3_validator_block_formatted:
type: remap
inputs:
- xatu_server_events_router.eth_v3_validator_block
source: |-
# handle message version name pathing and map it back to .data.message
if !exists(.data.message) {
message, err = get(value: .data, path: [.data.version])
if err == null {
.data.message = message
} else {
.error = err
.error_description = "failed to get data.message"
log(., level: "error", rate_limit_secs: 60)
}

cleanedUpData, err = remove(value: .data, path: [.data.version])
if err == null {
.data = cleanedUpData
} else {
.error = err
.error_description = "failed to remove data.message"
log(., level: "error", rate_limit_secs: 60)
}
}

event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.slot = .data.message.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.block_version = .meta.client.additional_data.version
.block_total_bytes = .meta.client.additional_data.total_bytes
.block_total_bytes_compressed = .meta.client.additional_data.total_bytes_compressed

.execution_payload_block_number = .data.message.body.execution_payload.block_number
.execution_payload_base_fee_per_gas = .data.message.body.execution_payload.base_fee_per_gas
.execution_payload_blob_gas_used = .data.message.body.execution_payload.blob_gas_used
.execution_payload_excess_blob_gas = .data.message.body.execution_payload.excess_blob_gas
.execution_payload_gas_limit = .data.message.body.execution_payload.gas_limit
.execution_payload_gas_used = .data.message.body.execution_payload.gas_used
.execution_payload_transactions_count = .meta.client.additional_data.transactions_count
.execution_payload_transactions_total_bytes = .meta.client.additional_data.transactions_total_bytes
.execution_payload_transactions_total_bytes_compressed = .meta.client.additional_data.transactions_total_bytes_compressed
.consensus_payload_value = .meta.client.additional_data.consensus_value
.execution_payload_value = .meta.client.additional_data.execution_value
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)

sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -2551,3 +2638,24 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: true
beacon_api_eth_v3_validator_block_clickhouse:
type: clickhouse
inputs:
- beacon_api_eth_v3_validator_block_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: beacon_api_eth_v3_validator_block
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false

2 changes: 2 additions & 0 deletions deploy/migrations/clickhouse/053_validator_block.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS beacon_api_eth_v3_validator_block ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS beacon_api_eth_v3_validator_block_local ON CLUSTER '{cluster}' SYNC;
68 changes: 68 additions & 0 deletions deploy/migrations/clickhouse/053_validator_block.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- Step 1: Create the new table.
CREATE TABLE default.beacon_api_eth_v3_validator_block_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the sentry received the event from a beacon node' CODEC(DoubleDelta, ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number within the payload' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The wall clock time when the reorg slot started',
`epoch` UInt32 COMMENT 'The epoch number in the beacon API event stream payload' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The wall clock time when the epoch started' CODEC(DoubleDelta, ZSTD(1)),
`block_version` LowCardinality(String) COMMENT 'The version of the beacon block',
`block_total_bytes` Nullable(UInt32) COMMENT 'The total bytes of the beacon block payload' CODEC(ZSTD(1)),
`block_total_bytes_compressed` Nullable(UInt32) COMMENT 'The total bytes of the beacon block payload when compressed using snappy' CODEC(ZSTD(1)),
`consensus_payload_value` Nullable(UInt64) COMMENT 'Consensus rewards paid to the proposer for this block, in Wei. Use to determine relative value of consensus blocks.' CODEC(ZSTD(1)),
`execution_payload_value` Nullable(UInt64) COMMENT 'Execution payload value in Wei. Use to determine relative value of execution payload.' CODEC(ZSTD(1)),
`execution_payload_block_number` UInt32 COMMENT 'The block number of the execution payload',
`execution_payload_base_fee_per_gas` Nullable(UInt128) COMMENT 'Base fee per gas for execution payload' CODEC(ZSTD(1)),
`execution_payload_blob_gas_used` Nullable(UInt64) COMMENT 'Gas used for blobs in execution payload' CODEC(ZSTD(1)),
`execution_payload_excess_blob_gas` Nullable(UInt64) COMMENT 'Excess gas used for blobs in execution payload' CODEC(ZSTD(1)),
`execution_payload_gas_limit` Nullable(UInt64) COMMENT 'Gas limit for execution payload' CODEC(DoubleDelta, ZSTD(1)),
`execution_payload_gas_used` Nullable(UInt64) COMMENT 'Gas used for execution payload' CODEC(ZSTD(1)),
`execution_payload_transactions_count` Nullable(UInt32) COMMENT 'The transaction count of the execution payload' CODEC(ZSTD(1)),
`execution_payload_transactions_total_bytes` Nullable(UInt32) COMMENT 'The transaction total bytes of the execution payload' CODEC(ZSTD(1)),
`execution_payload_transactions_total_bytes_compressed` Nullable(UInt32) COMMENT 'The transaction total bytes of the execution payload when compressed using snappy' CODEC(ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event',
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event',
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event',
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event',
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event',
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_id` Int32 COMMENT 'Ethereum network ID' CODEC(DoubleDelta, ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_consensus_version` LowCardinality(String) COMMENT 'Ethereum consensus client version that generated the event',
`meta_consensus_version_major` LowCardinality(String) COMMENT 'Ethereum consensus client major version that generated the event',
`meta_consensus_version_minor` LowCardinality(String) COMMENT 'Ethereum consensus client minor version that generated the event',
`meta_consensus_version_patch` LowCardinality(String) COMMENT 'Ethereum consensus client patch version that generated the event',
`meta_consensus_implementation` LowCardinality(String) COMMENT 'Ethereum consensus client implementation that generated the event',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
meta_client_name
)
COMMENT 'Contains beacon API /eth/v3/validator/blocks/{slot} data from each sentry client attached to a beacon node.';

-- Step 2: Create the distributed table.
CREATE TABLE default.beacon_api_eth_v3_validator_block ON CLUSTER '{cluster}' AS default.beacon_api_eth_v3_validator_block_local ENGINE = Distributed(
'{cluster}',
default,
beacon_api_eth_v3_validator_block_local,
cityHash64(
slot_start_date_time,
meta_network_name,
meta_client_name
)
);
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ services:
"beacon-api-eth-v1-beacon-validators"
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
"beacon-api-eth-v3-validator-block"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down Expand Up @@ -565,7 +566,7 @@ services:
- ./deploy/local/docker-compose/xatu-cannon.yaml:/etc/cannon/config.yaml
networks:
- xatu-net

networks:
xatu-net:
driver: bridge
Expand Down
12 changes: 12 additions & 0 deletions example_sentry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ attestationData:
beaconCommittees:
enabled: true

validatorBlock:
enabled: false

interval:
enabled: false
every: 30s

at:
enabled: false
slotTimes:
- 4s

outputs:
- name: http-sink
type: http
Expand Down
Loading
Loading