Skip to content

Commit

Permalink
move metered-channel to mysten-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed May 23, 2023
1 parent 012dfba commit c21861d
Show file tree
Hide file tree
Showing 33 changed files with 87 additions and 53 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/mysten-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ dashmap = "5.4.0"
uuid = { version = "1.1.2", features = ["v4", "fast-rng"]}
parking_lot = "0.12.1"
futures = "0.3.23"
async-trait = "0.1.61"

prometheus-closure-metric = { path = "../prometheus-closure-metric" }
9 changes: 9 additions & 0 deletions crates/mysten-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use uuid::Uuid;

mod guards;
pub mod histogram;
pub mod metered_channel;
pub use guards::*;

pub const TX_TYPE_SINGLE_WRITER_TX: &str = "single_writer";
Expand All @@ -27,6 +28,7 @@ pub const TX_TYPE_SHARED_OBJ_TX: &str = "shared_object";
pub struct Metrics {
pub tasks: IntGaugeVec,
pub futures: IntGaugeVec,
pub channels: IntGaugeVec,
pub scope_iterations: IntGaugeVec,
pub scope_duration_ns: IntGaugeVec,
pub scope_entrance: IntGaugeVec,
Expand All @@ -49,6 +51,13 @@ impl Metrics {
registry,
)
.unwrap(),
channels: register_int_gauge_vec_with_registry!(
"monitored_channels",
"Size of channels.",
&["name"],
registry,
)
.unwrap(),
scope_entrance: register_int_gauge_vec_with_registry!(
"monitored_scope_entrance",
"Number of entrance in the scope.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl<T> From<Receiver<T>> for ReceiverStream<T> {
}

// TODO: facade PollSender
// TODO: add prom metrics reporting for gauge and migrate all existing use cases.

////////////////////////////////////////////////////////////////
/// Constructor
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl SubscriptionHandler {
trace!(
num_events = events.data.len(),
tx_digest =? effects.transaction_digest(),
"Finished writing events to event store"
"Processing tx/event subscription"
);

if let Err(e) = self
Expand Down
27 changes: 23 additions & 4 deletions crates/sui-core/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,29 @@

use crate::event_handler::EVENT_DISPATCH_BUFFER_SIZE;
use futures::Stream;
use mysten_metrics::metered_channel::Sender;
use mysten_metrics::spawn_monitored_task;
use parking_lot::RwLock;
use prometheus::{IntGauge, Registry};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use sui_json_rpc_types::Filter;
use sui_types::base_types::ObjectID;
use sui_types::error::SuiError;
use tap::TapFallible;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, warn};

type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (Sender<T>, F)>>>;
type Subscribers<T, F> = Arc<RwLock<BTreeMap<String, (tokio::sync::mpsc::Sender<T>, F)>>>;

/// The Streamer splits a mpsc channel into multiple mpsc channels using the subscriber's `Filter<T>` object.
/// Data will be sent to the subscribers in parallel and the subscription will be dropped if it received a send error.
pub struct Streamer<T, S, F: Filter<T>> {
streamer_queue: Sender<T>,
subscribers: Subscribers<S, F>,
gauge: IntGauge,
}

impl<T, S, F> Streamer<T, S, F>
Expand All @@ -32,16 +35,31 @@ where
F: Filter<T> + Clone + Send + Sync + 'static + Clone,
{
pub fn spawn(buffer: usize) -> Self {
let (tx, rx) = mpsc::channel::<T>(buffer);
let gauge = if let Some(metrics) = mysten_metrics::get_metrics() {
metrics.channels.with_label_values(&["streamer"])
} else {
// We call init_metrics very early when starting a node. Therefore when this happens,
// it's probably in a test.
mysten_metrics::init_metrics(&Registry::default());
mysten_metrics::get_metrics()
.unwrap()
.channels
.with_label_values(&["streamer"])
};

let (tx, rx) = mysten_metrics::metered_channel::channel(buffer, &gauge);
let gague_clone = gauge.clone();
let streamer = Self {
streamer_queue: tx,
subscribers: Default::default(),
gauge,
};
let mut rx = rx;
let subscribers = streamer.subscribers.clone();
spawn_monitored_task!(async move {
while let Some(data) = rx.recv().await {
Self::send_to_all_subscribers(subscribers.clone(), data).await;
gague_clone.dec();
}
});
streamer
Expand Down Expand Up @@ -81,8 +99,9 @@ where
self.streamer_queue
.send(data)
.await
.map_err(|e| SuiError::EventFailedToDispatch {
.map_err(|e| SuiError::FailedToDispatchSubscription {
error: e.to_string(),
})
.tap_ok(|_| self.gauge.inc())
}
}
4 changes: 2 additions & 2 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ pub enum SuiError {
#[error("Authority Error: {error:?}")]
GenericAuthorityError { error: String },

#[error("Failed to dispatch event: {error:?}")]
EventFailedToDispatch { error: String },
#[error("Failed to dispatch subscription: {error:?}")]
FailedToDispatchSubscription { error: String },

#[error("Failed to serialize Owner: {error:?}")]
OwnerFailedToSerialize { error: String },
Expand Down
5 changes: 3 additions & 2 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::utils::gc_round;
use crate::{metrics::ConsensusMetrics, ConsensusError, Outcome, SequenceNumber};
use config::{AuthorityIdentifier, Committee};
use fastcrypto::hash::Hash;
use mysten_metrics::metered_channel;
use mysten_metrics::spawn_logged_monitored_task;
use std::{
cmp::{max, Ordering},
Expand All @@ -18,8 +19,8 @@ use storage::{CertificateStore, ConsensusStore};
use tokio::{sync::watch, task::JoinHandle};
use tracing::{debug, info, instrument};
use types::{
metered_channel, Certificate, CertificateAPI, CertificateDigest, CommittedSubDag,
ConditionalBroadcastReceiver, ConsensusCommit, HeaderAPI, Round, Timestamp,
Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver,
ConsensusCommit, HeaderAPI, Round, Timestamp,
};

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion narwhal/consensus/src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use config::{AuthorityIdentifier, Committee};
use dag::node_dag::{NodeDag, NodeDagError};
use fastcrypto::hash::Hash;
use mysten_metrics::metered_channel;
use mysten_metrics::spawn_logged_monitored_task;
use std::{
borrow::Borrow,
Expand All @@ -20,7 +21,7 @@ use tokio::{
task::JoinHandle,
};
use tracing::instrument;
use types::{metered_channel, Certificate, CertificateDigest, ConditionalBroadcastReceiver, Round};
use types::{Certificate, CertificateDigest, ConditionalBroadcastReceiver, Round};

use crate::{metrics::ConsensusMetrics, DEFAULT_CHANNEL_SIZE};

Expand Down
6 changes: 2 additions & 4 deletions narwhal/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ use crate::subscriber::spawn_subscriber;
use async_trait::async_trait;
use config::{AuthorityIdentifier, Committee, WorkerCache};
use mockall::automock;
use mysten_metrics::metered_channel;
use network::client::NetworkClient;
use prometheus::Registry;
use std::sync::Arc;
use storage::{CertificateStore, ConsensusStore};
use tokio::task::JoinHandle;
use tracing::info;
use types::{
metered_channel, CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver,
ConsensusOutput,
};
use types::{CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver, ConsensusOutput};

/// Convenience type representing a serialized transaction.
pub type SerializedTransaction = Vec<u8>;
Expand Down
3 changes: 2 additions & 1 deletion narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use std::{sync::Arc, time::Duration, vec};
use types::FetchBatchesRequest;

use fastcrypto::hash::Hash;
use mysten_metrics::metered_channel;
use mysten_metrics::spawn_logged_monitored_task;
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use types::{
metered_channel, Batch, BatchAPI, BatchDigest, Certificate, CertificateAPI, CommittedSubDag,
Batch, BatchAPI, BatchDigest, Certificate, CertificateAPI, CommittedSubDag,
ConditionalBroadcastReceiver, ConsensusOutput, HeaderAPI, Timestamp,
};

Expand Down
5 changes: 2 additions & 3 deletions narwhal/node/src/primary_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use consensus::Consensus;
use crypto::{KeyPair, NetworkKeyPair, PublicKey};
use executor::{get_restored_consensus_output, ExecutionState, Executor, SubscriberResult};
use fastcrypto::traits::{KeyPair as _, VerifyingKey};
use mysten_metrics::metered_channel;
use mysten_metrics::{RegistryID, RegistryService};
use network::client::NetworkClient;
use primary::{NetworkModel, Primary, PrimaryChannelMetrics, NUM_SHUTDOWN_RECEIVERS};
Expand All @@ -22,9 +23,7 @@ use storage::NodeStorage;
use tokio::sync::{watch, RwLock};
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument};
use types::{
metered_channel, Certificate, ConditionalBroadcastReceiver, PreSubscribedBroadcastSender, Round,
};
use types::{Certificate, ConditionalBroadcastReceiver, PreSubscribedBroadcastSender, Round};

struct PrimaryNodeInner {
// The configuration parameters.
Expand Down
4 changes: 2 additions & 2 deletions narwhal/primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use std::{collections::HashMap, sync::Arc};
use storage::{CertificateStore, HeaderStore, PayloadStore};
use store::rocks::TypedStoreError;

use mysten_metrics::metered_channel::Sender;
use tracing::{debug, instrument, warn};
use types::{
metered_channel::Sender, BatchDigest, Certificate, CertificateAPI, CertificateDigest,
HeaderAPI, HeaderDigest, Round,
BatchDigest, Certificate, CertificateAPI, CertificateDigest, HeaderAPI, HeaderDigest, Round,
};

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion narwhal/primary/src/block_synchronizer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use fastcrypto::hash::Hash;
use futures::future::join_all;
#[cfg(test)]
use mockall::*;
use mysten_metrics::metered_channel;
use std::time::Duration;
use storage::CertificateStore;
use thiserror::Error;
Expand All @@ -20,7 +21,7 @@ use tokio::{
time::timeout,
};
use tracing::{debug, error, instrument, trace};
use types::{metered_channel, Certificate, CertificateDigest};
use types::{Certificate, CertificateDigest};

#[cfg(test)]
#[path = "tests/handler_tests.rs"]
Expand Down
3 changes: 2 additions & 1 deletion narwhal/primary/src/block_synchronizer/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_synchronizer::{BlockHeader, BlockSynchronizeResult, Command};
use fastcrypto::hash::Hash;
use mysten_metrics::metered_channel;
use prometheus::IntGauge;
use std::collections::HashMap;
use tokio::sync::oneshot;
use types::{metered_channel, CertificateDigest};
use types::CertificateDigest;

#[derive(Debug)]
enum Core {
Expand Down
7 changes: 4 additions & 3 deletions narwhal/primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::{
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use mysten_metrics::metered_channel;
use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
use network::anemo_ext::NetworkExt;
use network::UnreliableNetwork;
Expand All @@ -31,9 +32,9 @@ use thiserror::Error;
use tokio::{sync::mpsc::Sender, task::JoinHandle, time::timeout};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
metered_channel, BatchDigest, Certificate, CertificateAPI, CertificateDigest,
ConditionalBroadcastReceiver, GetCertificatesRequest, HeaderAPI, PayloadAvailabilityRequest,
PrimaryToPrimaryClient, WorkerSynchronizeMessage,
BatchDigest, Certificate, CertificateAPI, CertificateDigest, ConditionalBroadcastReceiver,
GetCertificatesRequest, HeaderAPI, PayloadAvailabilityRequest, PrimaryToPrimaryClient,
WorkerSynchronizeMessage,
};

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/certificate_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use consensus::consensus::ConsensusRound;
use crypto::NetworkPublicKey;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use mysten_metrics::metered_channel::Receiver;
use mysten_metrics::{monitored_future, monitored_scope, spawn_logged_monitored_task};
use network::PrimaryToPrimaryRpc;
use rand::{rngs::ThreadRng, seq::SliceRandom};
Expand All @@ -26,7 +27,6 @@ use tokio::{
use tracing::{debug, error, instrument, trace, warn};
use types::{
error::{DagError, DagResult},
metered_channel::Receiver,
Certificate, CertificateAPI, ConditionalBroadcastReceiver, FetchCertificatesRequest,
FetchCertificatesResponse, HeaderAPI, Round,
};
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/certifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crypto::{NetworkPublicKey, Signature};
use fastcrypto::signature_service::SignatureService;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mysten_metrics::metered_channel::Receiver;
use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
use network::anemo_ext::NetworkExt;
use std::sync::Arc;
Expand All @@ -22,7 +23,6 @@ use tracing::{debug, enabled, error, info, instrument, warn};
use types::{
ensure,
error::{DagError, DagResult},
metered_channel::Receiver,
Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, HeaderAPI,
PrimaryToPrimaryClient, RequestVoteRequest, Vote, VoteAPI,
};
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use fastcrypto::{
signature_service::SignatureService,
traits::{KeyPair as _, ToFromBytes},
};
use mysten_metrics::metered_channel::{channel_with_total, Receiver, Sender};
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use mysten_network::{multiaddr::Protocol, Multiaddr};
use network::{
Expand Down Expand Up @@ -65,7 +66,6 @@ use tracing::{debug, error, info, instrument, warn};
use types::{
ensure,
error::{DagError, DagResult},
metered_channel::{channel_with_total, Receiver, Sender},
now, Certificate, CertificateAPI, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, HeaderAPI,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PreSubscribedBroadcastSender,
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{metrics::PrimaryMetrics, NetworkModel};
use config::{AuthorityIdentifier, Committee, Epoch, WorkerId};
use fastcrypto::hash::Hash as _;
use mysten_metrics::metered_channel::{Receiver, Sender};
use mysten_metrics::spawn_logged_monitored_task;
use std::collections::{BTreeMap, VecDeque};
use std::{cmp::Ordering, sync::Arc};
Expand All @@ -17,7 +18,6 @@ use tokio::{
use tracing::{debug, enabled, error, info, trace};
use types::{
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
BatchDigest, Certificate, CertificateAPI, Header, HeaderAPI, Round, TimestampMs,
};
use types::{now, ConditionalBroadcastReceiver};
Expand Down
6 changes: 2 additions & 4 deletions narwhal/primary/src/state_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use config::AuthorityIdentifier;
use mysten_metrics::metered_channel::{Receiver, Sender};
use mysten_metrics::spawn_logged_monitored_task;
use tap::TapFallible;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use types::{
metered_channel::{Receiver, Sender},
Certificate, CertificateAPI, ConditionalBroadcastReceiver, HeaderAPI, Round,
};
use types::{Certificate, CertificateAPI, ConditionalBroadcastReceiver, HeaderAPI, Round};

/// Receives the highest round reached by consensus and update it for all tasks.
pub struct StateHandler {
Expand Down
Loading

0 comments on commit c21861d

Please sign in to comment.