Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 2654579

Browse files
committed
Merge remote-tracking branch 'origin/master' into slumber-delay-upgrade-from-inclusion
2 parents 18f8932 + e2a3435 commit 2654579

File tree

13 files changed

+269
-16
lines changed

13 files changed

+269
-16
lines changed

.gitlab-ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ variables:
3333
GIT_STRATEGY: fetch
3434
GIT_DEPTH: 100
3535
CI_SERVER_NAME: "GitLab CI"
36-
CI_IMAGE: "paritytech/ci-linux:production"
36+
CI_IMAGE: "paritytech/ci-unified:bullseye-1.70.0-2023-05-23"
3737
BUILDAH_IMAGE: "quay.io/buildah/stable:v1.29"
3838
BUILDAH_COMMAND: "buildah --storage-driver overlay2"
3939
DOCKER_OS: "debian:stretch"

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/gum/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition.workspace = true
66
description = "Stick logs together with the TraceID as provided by tempo"
77

88
[dependencies]
9+
coarsetime = "0.1.22"
910
tracing = "0.1.35"
1011
jaeger = { path = "../jaeger", package = "polkadot-node-jaeger" }
1112
gum-proc-macro = { path = "./proc-macro", package = "tracing-gum-proc-macro" }

node/gum/proc-macro/src/lib.rs

+21
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,27 @@ pub fn warn(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
4343
gum(item, Level::Warn)
4444
}
4545

46+
/// Print a warning or debug level message depending on their frequency
47+
#[proc_macro]
48+
pub fn warn_if_frequent(item: proc_macro::TokenStream) -> proc_macro::TokenStream {
49+
let ArgsIfFrequent { freq, max_rate, rest } = parse2(item.into()).unwrap();
50+
51+
let freq_expr = freq.expr;
52+
let max_rate_expr = max_rate.expr;
53+
let debug: proc_macro2::TokenStream = gum(rest.clone().into(), Level::Debug).into();
54+
let warn: proc_macro2::TokenStream = gum(rest.into(), Level::Warn).into();
55+
56+
let stream = quote! {
57+
if #freq_expr .is_frequent(#max_rate_expr) {
58+
#warn
59+
} else {
60+
#debug
61+
}
62+
};
63+
64+
stream.into()
65+
}
66+
4667
/// Print a info level message.
4768
#[proc_macro]
4869
pub fn info(item: proc_macro::TokenStream) -> proc_macro::TokenStream {

node/gum/proc-macro/src/types.rs

+46
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use syn::{
2323

2424
pub(crate) mod kw {
2525
syn::custom_keyword!(target);
26+
syn::custom_keyword!(freq);
27+
syn::custom_keyword!(max_rate);
2628
}
2729

2830
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -248,6 +250,50 @@ impl ToTokens for FmtGroup {
248250
}
249251
}
250252

253+
#[derive(Debug, Clone, PartialEq, Eq)]
254+
pub(crate) struct Freq {
255+
kw: kw::freq,
256+
colon: Token![:],
257+
pub expr: syn::Expr,
258+
}
259+
260+
impl Parse for Freq {
261+
fn parse(input: ParseStream) -> Result<Self> {
262+
Ok(Self { kw: input.parse()?, colon: input.parse()?, expr: input.parse()? })
263+
}
264+
}
265+
266+
#[derive(Debug, Clone, PartialEq, Eq)]
267+
pub(crate) struct MaxRate {
268+
kw: kw::max_rate,
269+
colon: Token![:],
270+
pub expr: syn::Expr,
271+
}
272+
273+
impl Parse for MaxRate {
274+
fn parse(input: ParseStream) -> Result<Self> {
275+
Ok(Self { kw: input.parse()?, colon: input.parse()?, expr: input.parse()? })
276+
}
277+
}
278+
279+
pub(crate) struct ArgsIfFrequent {
280+
pub freq: Freq,
281+
pub max_rate: MaxRate,
282+
pub rest: TokenStream,
283+
}
284+
285+
impl Parse for ArgsIfFrequent {
286+
fn parse(input: ParseStream) -> Result<Self> {
287+
let freq = input.parse()?;
288+
let _: Token![,] = input.parse()?;
289+
let max_rate = input.parse()?;
290+
let _: Token![,] = input.parse()?;
291+
let rest = input.parse()?;
292+
293+
Ok(Self { freq, max_rate, rest })
294+
}
295+
}
296+
251297
/// Full set of arguments as provided to the `gum::warn!` call.
252298
#[derive(Debug, Clone)]
253299
pub(crate) struct Args {

node/gum/src/lib.rs

+78-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,84 @@ pub use jaeger::hash_to_trace_identifier;
112112
#[doc(hidden)]
113113
pub use polkadot_primitives::{CandidateHash, Hash};
114114

115-
pub use gum_proc_macro::{debug, error, info, trace, warn};
115+
pub use gum_proc_macro::{debug, error, info, trace, warn, warn_if_frequent};
116116

117117
#[cfg(test)]
118118
mod tests;
119+
120+
const FREQ_SMOOTHING_FACTOR: f32 = 0.5;
121+
122+
/// Exponential moving average
123+
#[derive(Debug, Default)]
124+
struct EmaBucket {
125+
current: f32,
126+
count: u32,
127+
}
128+
129+
impl EmaBucket {
130+
fn update(&mut self, value: f32, alpha: f32) {
131+
if self.count == 0 {
132+
self.current = value;
133+
} else {
134+
self.current += alpha * (value - self.current);
135+
}
136+
self.count += 1;
137+
}
138+
}
139+
140+
/// Utility struct to compare the rate of its own calls.
141+
pub struct Freq {
142+
ema: EmaBucket,
143+
last: u64,
144+
}
145+
146+
impl Freq {
147+
/// Initiates a new instance
148+
pub fn new() -> Self {
149+
Self { ema: Default::default(), last: Default::default() }
150+
}
151+
152+
/// Compares the rate of its own calls with the passed one.
153+
pub fn is_frequent(&mut self, max_rate: Times) -> bool {
154+
self.record();
155+
156+
// Two attempts is not enough to call something as frequent.
157+
if self.ema.count < 3 {
158+
return false
159+
}
160+
161+
let rate = 1000.0 / self.ema.current; // Current EMA represents interval in ms
162+
rate > max_rate.into()
163+
}
164+
165+
fn record(&mut self) {
166+
let now = coarsetime::Clock::now_since_epoch().as_millis() as u64;
167+
if self.last > 0 {
168+
self.ema.update((now - self.last) as f32, FREQ_SMOOTHING_FACTOR);
169+
}
170+
self.last = now;
171+
}
172+
}
173+
174+
/// Represents frequency per second, minute, hour and day
175+
pub enum Times {
176+
/// Per second
177+
PerSecond(u32),
178+
/// Per minute
179+
PerMinute(u32),
180+
/// Per hour
181+
PerHour(u32),
182+
/// Per day
183+
PerDay(u32),
184+
}
185+
186+
impl From<Times> for f32 {
187+
fn from(value: Times) -> Self {
188+
match value {
189+
Times::PerSecond(v) => v as f32,
190+
Times::PerMinute(v) => v as f32 / 60.0,
191+
Times::PerHour(v) => v as f32 / (60.0 * 60.0),
192+
Times::PerDay(v) => v as f32 / (60.0 * 60.0 * 24.0),
193+
}
194+
}
195+
}

node/gum/src/tests.rs

+62
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,21 @@ fn wo_unnecessary() {
5050
);
5151
}
5252

53+
#[test]
54+
fn if_frequent() {
55+
let a: i32 = 7;
56+
let mut f = Freq::new();
57+
warn_if_frequent!(
58+
freq: f,
59+
max_rate: Times::PerSecond(1),
60+
target: "bar",
61+
a = a,
62+
b = ?Y::default(),
63+
"fff {c}",
64+
c = a,
65+
);
66+
}
67+
5368
#[test]
5469
fn w_candidate_hash_value_assignment() {
5570
let a: i32 = 7;
@@ -102,3 +117,50 @@ fn w_candidate_hash_aliased_unnecessary() {
102117
"xxx",
103118
);
104119
}
120+
121+
#[test]
122+
fn frequent_at_fourth_time() {
123+
let mut freq = Freq::new();
124+
125+
assert!(!freq.is_frequent(Times::PerSecond(1)));
126+
assert!(!freq.is_frequent(Times::PerSecond(1)));
127+
assert!(!freq.is_frequent(Times::PerSecond(1)));
128+
129+
assert!(freq.is_frequent(Times::PerSecond(1)));
130+
}
131+
132+
#[test]
133+
fn not_frequent_at_fourth_time_if_slow() {
134+
let mut freq = Freq::new();
135+
136+
assert!(!freq.is_frequent(Times::PerSecond(1000)));
137+
assert!(!freq.is_frequent(Times::PerSecond(1000)));
138+
assert!(!freq.is_frequent(Times::PerSecond(1000)));
139+
140+
std::thread::sleep(std::time::Duration::from_millis(10));
141+
assert!(!freq.is_frequent(Times::PerSecond(1000)));
142+
}
143+
144+
#[test]
145+
fn calculate_rate_per_second() {
146+
let rate: f32 = Times::PerSecond(100).into();
147+
assert_eq!(rate, 100.0)
148+
}
149+
150+
#[test]
151+
fn calculate_rate_per_minute() {
152+
let rate: f32 = Times::PerMinute(100).into();
153+
assert_eq!(rate, 1.6666666)
154+
}
155+
156+
#[test]
157+
fn calculate_rate_per_hour() {
158+
let rate: f32 = Times::PerHour(100).into();
159+
assert_eq!(rate, 0.027777778)
160+
}
161+
162+
#[test]
163+
fn calculate_rate_per_day() {
164+
let rate: f32 = Times::PerDay(100).into();
165+
assert_eq!(rate, 0.0011574074)
166+
}

node/network/availability-distribution/src/error.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ pub type Result<T> = std::result::Result<T, Error>;
9191
///
9292
/// We basically always want to try and continue on error. This utility function is meant to
9393
/// consume top-level errors by simply logging them
94-
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> {
94+
pub fn log_error(
95+
result: Result<()>,
96+
ctx: &'static str,
97+
warn_freq: &mut gum::Freq,
98+
) -> std::result::Result<(), FatalError> {
9599
match result.into_nested()? {
96100
Ok(()) => Ok(()),
97101
Err(jfyi) => {
@@ -104,7 +108,8 @@ pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(
104108
JfyiError::FetchPoV(_) |
105109
JfyiError::SendResponse |
106110
JfyiError::NoSuchPoV |
107-
JfyiError::Runtime(_) => gum::debug!(target: LOG_TARGET, error = ?jfyi, ctx),
111+
JfyiError::Runtime(_) =>
112+
gum::warn_if_frequent!(freq: warn_freq, max_rate: gum::Times::PerHour(100), target: LOG_TARGET, error = ?jfyi, ctx),
108113
}
109114
Ok(())
110115
},

node/network/availability-distribution/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl AvailabilityDistributionSubsystem {
9797

9898
let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
9999
let mut requester = Requester::new(metrics.clone()).fuse();
100+
let mut warn_freq = gum::Freq::new();
100101

101102
{
102103
let sender = ctx.sender().clone();
@@ -147,6 +148,7 @@ impl AvailabilityDistributionSubsystem {
147148
.update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
148149
.await,
149150
"Error in Requester::update_fetching_heads",
151+
&mut warn_freq,
150152
)?;
151153
},
152154
FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _)) => {
@@ -188,6 +190,7 @@ impl AvailabilityDistributionSubsystem {
188190
)
189191
.await,
190192
"pov_requester::fetch_pov",
193+
&mut warn_freq,
191194
)?;
192195
},
193196
}

node/network/availability-distribution/src/requester/fetch_task/mod.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ impl RunningTask {
260260
let mut succeeded = false;
261261
let mut count: u32 = 0;
262262
let mut span = self.span.child("run-fetch-chunk-task").with_relay_parent(self.relay_parent);
263+
let mut network_error_freq = gum::Freq::new();
264+
let mut canceled_freq = gum::Freq::new();
263265
// Try validators in reverse order:
264266
while let Some(validator) = self.group.pop() {
265267
// Report retries:
@@ -272,7 +274,10 @@ impl RunningTask {
272274
.with_chunk_index(self.request.index.0)
273275
.with_stage(jaeger::Stage::AvailabilityDistribution);
274276
// Send request:
275-
let resp = match self.do_request(&validator).await {
277+
let resp = match self
278+
.do_request(&validator, &mut network_error_freq, &mut canceled_freq)
279+
.await
280+
{
276281
Ok(resp) => resp,
277282
Err(TaskError::ShuttingDown) => {
278283
gum::info!(
@@ -342,6 +347,8 @@ impl RunningTask {
342347
async fn do_request(
343348
&mut self,
344349
validator: &AuthorityDiscoveryId,
350+
nerwork_error_freq: &mut gum::Freq,
351+
canceled_freq: &mut gum::Freq,
345352
) -> std::result::Result<ChunkFetchingResponse, TaskError> {
346353
gum::trace!(
347354
target: LOG_TARGET,
@@ -386,7 +393,9 @@ impl RunningTask {
386393
Err(TaskError::PeerError)
387394
},
388395
Err(RequestError::NetworkError(err)) => {
389-
gum::debug!(
396+
gum::warn_if_frequent!(
397+
freq: nerwork_error_freq,
398+
max_rate: gum::Times::PerHour(100),
390399
target: LOG_TARGET,
391400
origin = ?validator,
392401
relay_parent = ?self.relay_parent,
@@ -400,7 +409,9 @@ impl RunningTask {
400409
Err(TaskError::PeerError)
401410
},
402411
Err(RequestError::Canceled(oneshot::Canceled)) => {
403-
gum::debug!(
412+
gum::warn_if_frequent!(
413+
freq: canceled_freq,
414+
max_rate: gum::Times::PerHour(100),
404415
target: LOG_TARGET,
405416
origin = ?validator,
406417
relay_parent = ?self.relay_parent,

0 commit comments

Comments
 (0)