Skip to content

Commit 1e715fe

Browse files
Replay aware logger (#48)
* Added replay aware tracing filter * Few changes to the replay aware filter feature: * We now emit the span all the times for the endpoint handle method. This seems a reasonable default, and doesn't require the tracing-subscriber dependency (we already pull in tracing as dependency anyway) * Renamed the span name and the replaying field filter, to avoid confusion and clash * Now the replay aware filter will look for the exact span name created by the handler. This should make the matching of the exact span we wanna use for filtering more robust. * Flip the replaying field not only when taking an awaiting, but also when executing `sys_` functions on the state machine. Those are the ones that can cause state transitions. * Simplify the tracing example * Rename feature to `tracing-span-filter` --------- Co-authored-by: Harsha Teja Kanna <h7kanna@gmail.com>
1 parent c4243fe commit 1e715fe

File tree

7 files changed

+188
-6
lines changed

7 files changed

+188
-6
lines changed

Cargo.toml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@ license = "MIT"
77
repository = "https://github.com/restatedev/sdk-rust"
88
rust-version = "1.76.0"
99

10+
[[example]]
11+
name = "tracing"
12+
path = "examples/tracing.rs"
13+
required-features = ["tracing-span-filter"]
14+
1015
[features]
11-
default = ["http_server", "rand", "uuid"]
16+
default = ["http_server", "rand", "uuid", "tracing-span-filter"]
1217
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
1318
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
14-
19+
tracing-span-filter = ["dep:tracing-subscriber"]
1520

1621
[dependencies]
1722
bytes = "1.6.1"
@@ -31,11 +36,12 @@ thiserror = "1.0.63"
3136
tokio = { version = "1", default-features = false, features = ["sync"] }
3237
tower-service = "0.3"
3338
tracing = "0.1"
39+
tracing-subscriber = { version = "0.3", features = ["registry"], optional = true }
3440
uuid = { version = "1.10.0", optional = true }
3541

3642
[dev-dependencies]
3743
tokio = { version = "1", features = ["full"] }
38-
tracing-subscriber = "0.3"
44+
tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] }
3945
trybuild = "1.0"
4046
reqwest = { version = "0.12", features = ["json"] }
4147
rand = "0.8.5"

examples/tracing.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use restate_sdk::prelude::*;
2+
use std::time::Duration;
3+
use tracing::info;
4+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
5+
6+
#[restate_sdk::service]
7+
trait Greeter {
8+
async fn greet(name: String) -> Result<String, HandlerError>;
9+
}
10+
11+
struct GreeterImpl;
12+
13+
impl Greeter for GreeterImpl {
14+
async fn greet(&self, ctx: Context<'_>, name: String) -> Result<String, HandlerError> {
15+
info!("Before sleep");
16+
ctx.sleep(Duration::from_secs(61)).await?; // More than suspension timeout to trigger replay
17+
info!("After sleep");
18+
Ok(format!("Greetings {name}"))
19+
}
20+
}
21+
22+
#[tokio::main]
23+
async fn main() {
24+
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
25+
.unwrap_or_else(|_| "info,restate_sdk=debug".into());
26+
let replay_filter = restate_sdk::filter::ReplayAwareFilter;
27+
tracing_subscriber::registry()
28+
.with(
29+
tracing_subscriber::fmt::layer()
30+
.with_filter(env_filter)
31+
.with_filter(replay_filter),
32+
)
33+
.init();
34+
HttpServer::new(Endpoint::builder().bind(GreeterImpl.serve()).build())
35+
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
36+
.await;
37+
}

src/endpoint/context.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ pub struct ContextInternalInner {
3030
pub(crate) read: InputReceiver,
3131
pub(crate) write: OutputSender,
3232
pub(super) handler_state: HandlerStateNotifier,
33+
34+
/// We remember here the state of the span replaying field state, because setting it might be expensive (it's guarded behind locks and other stuff).
35+
/// For details, see [ContextInternalInner::maybe_flip_span_replaying_field]
36+
pub(super) span_replaying_field_state: bool,
3337
}
3438

3539
impl ContextInternalInner {
@@ -44,17 +48,29 @@ impl ContextInternalInner {
4448
read,
4549
write,
4650
handler_state,
51+
span_replaying_field_state: false,
4752
}
4853
}
4954

5055
pub(super) fn fail(&mut self, e: Error) {
56+
self.maybe_flip_span_replaying_field();
5157
self.vm.notify_error(
5258
CoreError::new(500u16, e.0.to_string())
5359
.with_stacktrace(Cow::Owned(format!("{:#}", e.0))),
5460
None,
5561
);
5662
self.handler_state.mark_error(e);
5763
}
64+
65+
pub(super) fn maybe_flip_span_replaying_field(&mut self) {
66+
if !self.span_replaying_field_state && self.vm.is_replaying() {
67+
tracing::Span::current().record("restate.sdk.is_replaying", true);
68+
self.span_replaying_field_state = true;
69+
} else if self.span_replaying_field_state && !self.vm.is_replaying() {
70+
tracing::Span::current().record("restate.sdk.is_replaying", false);
71+
self.span_replaying_field_state = false;
72+
}
73+
}
5874
}
5975

6076
/// Internal context interface.
@@ -190,6 +206,7 @@ impl ContextInternal {
190206
},
191207
))
192208
});
209+
inner_lock.maybe_flip_span_replaying_field();
193210

194211
match input_result {
195212
Ok(Ok(i)) => {
@@ -223,6 +240,7 @@ impl ContextInternal {
223240
) -> impl Future<Output = Result<Option<T>, TerminalError>> + Send {
224241
let mut inner_lock = must_lock!(self.inner);
225242
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_state_get(key.to_owned()));
243+
inner_lock.maybe_flip_span_replaying_field();
226244

227245
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
228246
Ok(Value::Void) => Ok(Ok(None)),
@@ -246,6 +264,7 @@ impl ContextInternal {
246264
pub fn get_keys(&self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + Send {
247265
let mut inner_lock = must_lock!(self.inner);
248266
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_state_get_keys());
267+
inner_lock.maybe_flip_span_replaying_field();
249268

250269
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
251270
Ok(Value::Failure(f)) => Ok(Err(f.into())),
@@ -266,6 +285,7 @@ impl ContextInternal {
266285
match t.serialize() {
267286
Ok(b) => {
268287
let _ = inner_lock.vm.sys_state_set(key.to_owned(), b);
288+
inner_lock.maybe_flip_span_replaying_field();
269289
}
270290
Err(e) => {
271291
inner_lock.fail(Error::serialization("set_state", e));
@@ -274,11 +294,15 @@ impl ContextInternal {
274294
}
275295

276296
pub fn clear(&self, key: &str) {
277-
let _ = must_lock!(self.inner).vm.sys_state_clear(key.to_string());
297+
let mut inner_lock = must_lock!(self.inner);
298+
let _ = inner_lock.vm.sys_state_clear(key.to_string());
299+
inner_lock.maybe_flip_span_replaying_field();
278300
}
279301

280302
pub fn clear_all(&self) {
281-
let _ = must_lock!(self.inner).vm.sys_state_clear_all();
303+
let mut inner_lock = must_lock!(self.inner);
304+
let _ = inner_lock.vm.sys_state_clear_all();
305+
inner_lock.maybe_flip_span_replaying_field();
282306
}
283307

284308
pub fn sleep(
@@ -293,6 +317,7 @@ impl ContextInternal {
293317
inner_lock,
294318
inner_lock.vm.sys_sleep(now + sleep_duration, Some(now))
295319
);
320+
inner_lock.maybe_flip_span_replaying_field();
296321

297322
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
298323
Ok(Value::Void) => Ok(Ok(())),
@@ -328,6 +353,7 @@ impl ContextInternal {
328353
);
329354

330355
let call_handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_call(target, input));
356+
inner_lock.maybe_flip_span_replaying_field();
331357
drop(inner_lock);
332358

333359
// Let's prepare the two futures here
@@ -411,6 +437,7 @@ impl ContextInternal {
411437
return Either::Right(TrapFuture::<()>::default());
412438
}
413439
};
440+
inner_lock.maybe_flip_span_replaying_field();
414441
drop(inner_lock);
415442

416443
let invocation_id_fut = InterceptErrorFuture::new(
@@ -452,6 +479,7 @@ impl ContextInternal {
452479
) {
453480
let mut inner_lock = must_lock!(self.inner);
454481
let maybe_awakeable_id_and_handle = inner_lock.vm.sys_awakeable();
482+
inner_lock.maybe_flip_span_replaying_field();
455483

456484
let (awakeable_id, handle) = match maybe_awakeable_id_and_handle {
457485
Ok((s, handle)) => (s, handle),
@@ -512,6 +540,7 @@ impl ContextInternal {
512540
) -> impl Future<Output = Result<T, TerminalError>> + Send {
513541
let mut inner_lock = must_lock!(self.inner);
514542
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_get_promise(name.to_owned()));
543+
inner_lock.maybe_flip_span_replaying_field();
515544
drop(inner_lock);
516545

517546
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
@@ -537,6 +566,7 @@ impl ContextInternal {
537566
) -> impl Future<Output = Result<Option<T>, TerminalError>> + Send {
538567
let mut inner_lock = must_lock!(self.inner);
539568
let handle = unwrap_or_trap!(inner_lock, inner_lock.vm.sys_peek_promise(name.to_owned()));
569+
inner_lock.maybe_flip_span_replaying_field();
540570
drop(inner_lock);
541571

542572
let poll_future = get_async_result(Arc::clone(&self.inner), handle).map(|res| match res {
@@ -625,6 +655,7 @@ impl ContextInternal {
625655
};
626656

627657
let _ = inner_lock.vm.sys_write_output(res_to_write);
658+
inner_lock.maybe_flip_span_replaying_field();
628659
}
629660

630661
pub fn end(&self) {
@@ -859,6 +890,7 @@ impl<InvIdFut: Future<Output = Result<String, TerminalError>> + Send> Invocation
859890
let inv_id = cloned_invocation_id_fut.await?;
860891
let mut inner_lock = must_lock!(cloned_ctx);
861892
let _ = inner_lock.vm.sys_cancel_invocation(inv_id);
893+
inner_lock.maybe_flip_span_replaying_field();
862894
drop(inner_lock);
863895
Ok(())
864896
}
@@ -903,6 +935,7 @@ where
903935
let inv_id = cloned_invocation_id_fut.await?;
904936
let mut inner_lock = must_lock!(cloned_ctx);
905937
let _ = inner_lock.vm.sys_cancel_invocation(inv_id);
938+
inner_lock.maybe_flip_span_replaying_field();
906939
drop(inner_lock);
907940
Ok(())
908941
}

src/endpoint/futures/async_result_poll.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ impl Future for VmAsyncResultPollFuture {
128128
}
129129
};
130130

131+
// DoProgress might cause a flip of the replaying state
132+
inner_lock.maybe_flip_span_replaying_field();
133+
131134
// At this point let's try to take the notification
132135
match inner_lock.vm.take_notification(handle) {
133136
Ok(Some(v)) => return Poll::Ready(Ok(v)),

src/endpoint/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::future::poll_fn;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
2020
use std::task::{Context, Poll};
21+
use tracing::{info_span, Instrument};
2122

2223
const DISCOVERY_CONTENT_TYPE: &str = "application/vnd.restate.endpointmanifest.v1+json";
2324

@@ -368,6 +369,13 @@ impl BidiStreamRunner {
368369
.get(&self.svc_name)
369370
.expect("service must exist at this point");
370371

372+
let span = info_span!(
373+
"restate_sdk_endpoint_handle",
374+
"rpc.system" = "restate",
375+
"rpc.service" = self.svc_name,
376+
"rpc.method" = self.handler_name,
377+
"restate.sdk.is_replaying" = false
378+
);
371379
handle(
372380
input_rx,
373381
output_tx,
@@ -376,6 +384,7 @@ impl BidiStreamRunner {
376384
self.handler_name,
377385
svc,
378386
)
387+
.instrument(span)
379388
.await
380389
}
381390
}

src/filter.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//! Replay aware tracing filter.
2+
3+
use std::fmt::Debug;
4+
use tracing::{
5+
field::{Field, Visit},
6+
span::{Attributes, Record},
7+
Event, Id, Metadata, Subscriber,
8+
};
9+
use tracing_subscriber::{
10+
layer::{Context, Filter},
11+
registry::LookupSpan,
12+
Layer,
13+
};
14+
15+
#[derive(Debug)]
16+
struct ReplayField(bool);
17+
18+
struct ReplayFieldVisitor(bool);
19+
20+
impl Visit for ReplayFieldVisitor {
21+
fn record_bool(&mut self, field: &Field, value: bool) {
22+
if field.name().eq("restate.sdk.is_replaying") {
23+
self.0 = value;
24+
}
25+
}
26+
27+
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}
28+
}
29+
30+
/// Replay aware tracing filter.
31+
///
32+
/// Use this filter to skip tracing events in the service while replaying:
33+
///
34+
/// ```rust,no_run
35+
/// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
36+
/// tracing_subscriber::registry()
37+
/// .with(
38+
/// tracing_subscriber::fmt::layer()
39+
/// // Default Env filter to read RUST_LOG
40+
/// .with_filter(tracing_subscriber::EnvFilter::from_default_env())
41+
/// // Replay aware filter
42+
/// .with_filter(restate_sdk::filter::ReplayAwareFilter)
43+
/// )
44+
/// .init();
45+
/// ```
46+
pub struct ReplayAwareFilter;
47+
48+
impl<S: Subscriber + for<'lookup> LookupSpan<'lookup>> Filter<S> for ReplayAwareFilter {
49+
fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
50+
true
51+
}
52+
53+
fn event_enabled(&self, event: &Event<'_>, cx: &Context<'_, S>) -> bool {
54+
if let Some(scope) = cx.event_scope(event) {
55+
let iterator = scope.from_root();
56+
for span in iterator {
57+
if span.name() == "restate_sdk_endpoint_handle" {
58+
if let Some(replay) = span.extensions().get::<ReplayField>() {
59+
return !replay.0;
60+
}
61+
}
62+
}
63+
}
64+
true
65+
}
66+
67+
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
68+
if let Some(span) = ctx.span(id) {
69+
if span.name() == "restate_sdk_endpoint_handle" {
70+
let mut visitor = ReplayFieldVisitor(false);
71+
attrs.record(&mut visitor);
72+
let mut extensions = span.extensions_mut();
73+
extensions.replace::<ReplayField>(ReplayField(visitor.0));
74+
}
75+
}
76+
}
77+
78+
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
79+
if let Some(span) = ctx.span(id) {
80+
if span.name() == "restate_sdk_endpoint_handle" {
81+
let mut visitor = ReplayFieldVisitor(false);
82+
values.record(&mut visitor);
83+
let mut extensions = span.extensions_mut();
84+
extensions.replace::<ReplayField>(ReplayField(visitor.0));
85+
}
86+
}
87+
}
88+
}
89+
90+
impl<S: Subscriber> Layer<S> for ReplayAwareFilter {}

src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@
207207
//! }
208208
//! ```
209209
//!
210-
//! For more information, have a look at the [tracing subscriber doc](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/index.html#filtering-events-with-environment-variables).
210+
//! You can filter logs *when a handler is being replayed* configuring the [filter::ReplayAwareFilter].
211+
//!
212+
//! For more information about tracing and logging, have a look at the [tracing subscriber doc](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/index.html#filtering-events-with-environment-variables).
211213
//!
212214
//! Next, have a look at the other [SDK features](#features).
213215
//!
@@ -218,6 +220,8 @@ pub mod service;
218220
pub mod context;
219221
pub mod discovery;
220222
pub mod errors;
223+
#[cfg(feature = "tracing-span-filter")]
224+
pub mod filter;
221225
#[cfg(feature = "http_server")]
222226
pub mod http_server;
223227
#[cfg(feature = "hyper")]

0 commit comments

Comments
 (0)