Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proper span handling to factories through extension trait #300

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.13.5"
version = "0.14.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
87 changes: 78 additions & 9 deletions ractor/src/factory/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::{hash::Hash, time::SystemTime};

use bon::Builder;
use tracing::Span;

use crate::{concurrency::Duration, Message};
use crate::{ActorRef, RpcReplyPort};
Expand Down Expand Up @@ -40,27 +41,91 @@
impl<T: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static> JobKey for T {}

/// Represents options for the specified job
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, PartialEq, Clone)]
pub struct JobOptions {
/// Time job was submitted from the client
pub submit_time: SystemTime,
submit_time: SystemTime,
/// Time job was processed by the factory
pub factory_time: SystemTime,
factory_time: SystemTime,
/// Time job was sent to a worker
pub worker_time: SystemTime,
worker_time: SystemTime,
/// Time-to-live for the job
pub ttl: Option<Duration>,
ttl: Option<Duration>,
/// The parent span we want to propagate to the worker.
/// Spans don't propagate over the wire in networks
span: Option<Span>,
}

impl Default for JobOptions {
fn default() -> Self {
impl JobOptions {
/// Create a new [JobOptions] instance, optionally supplying the ttl for the job
///
/// * `ttl` - The Time-to-live specification for this job, which is the maximum amount
/// of time the job can remain in the factory's (or worker's) queue before being expired
/// and discarded
///
/// Returns a new [JobOptions] instance.
pub fn new(ttl: Option<Duration>) -> Self {
let span = {
#[cfg(feature = "message_span_propogation")]
{
Some(Span::current())
}
#[cfg(not(feature = "message_span_propogation"))]
{
None
}
};
Self {
submit_time: SystemTime::now(),
factory_time: SystemTime::now(),
worker_time: SystemTime::now(),
ttl: None,
ttl,
span,
}
}

/// Retrieve the TTL for this job
pub fn ttl(&self) -> Option<Duration> {
self.ttl
}

Check warning on line 90 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L88-L90

Added lines #L88 - L90 were not covered by tests

/// Set the TTL for this job
pub fn set_ttl(&mut self, ttl: Option<Duration>) {
self.ttl = ttl;
}

Check warning on line 95 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L93-L95

Added lines #L93 - L95 were not covered by tests

/// Time the job was submitted to the factory
/// (i.e. the time `cast` was called)
pub fn submit_time(&self) -> SystemTime {
self.submit_time
}

Check warning on line 101 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L99-L101

Added lines #L99 - L101 were not covered by tests

/// Time the job was dispatched to a worker
pub fn worker_time(&self) -> SystemTime {
self.worker_time
}

Check warning on line 106 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L104-L106

Added lines #L104 - L106 were not covered by tests

/// Time the job was received by the factory and first either dispatched
/// or enqueued to the factory's queue
pub fn factory_time(&self) -> SystemTime {
self.factory_time
}

Check warning on line 112 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L110-L112

Added lines #L110 - L112 were not covered by tests

/// Clone the [Span] and return it which is attached
/// to this [JobOptions] instance.
pub fn span(&self) -> Option<Span> {
self.span.clone()
}

Check warning on line 118 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L116-L118

Added lines #L116 - L118 were not covered by tests

pub(crate) fn take_span(&mut self) -> Option<Span> {
self.span.take()
}
}

impl Default for JobOptions {
fn default() -> Self {
Self::new(None)
}
}

#[cfg(feature = "cluster")]
Expand All @@ -86,7 +151,10 @@

fn from_bytes(mut data: Vec<u8>) -> Self {
if data.len() != 16 {
Self::default()
Self {
span: None,
..Default::default()
}

Check warning on line 157 in ractor/src/factory/job.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/job.rs#L154-L157

Added lines #L154 - L157 were not covered by tests
} else {
let ttl_bytes = data.split_off(8);

Expand All @@ -100,6 +168,7 @@
} else {
None
},
span: None,
..Default::default()
}
}
Expand Down Expand Up @@ -360,7 +429,7 @@
/// 1. Regular loadshed events will cause this message to be retried if there is still retries left
/// and the job isn't expired unless you explicitely call `completed()` in the discard handler.
/// 2. Consumable types are not well supported here without some wrapping in Option types, which is
/// because the value is handled everywhere as `&mut ref`` due to the drop implementation requiring that

Check warning on line 432 in ractor/src/factory/job.rs

View workflow job for this annotation

GitHub Actions / docs

unescaped backtick
/// it be so. This means that RPCs using [crate::concurrency::oneshot]s likely won't work without
/// some real painful ergonomics.
/// 3. Upon successful handling of the job, you need to mark it as `completed()` at the end of your
Expand Down
57 changes: 21 additions & 36 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,38 @@
//! /// the business logic for each message that will be done in parallel.
//! struct ExampleWorker;
//! #[cfg_attr(feature = "async-trait", ractor::async_trait)]
//! impl Actor for ExampleWorker {
//! type Msg = WorkerMessage<(), ExampleMessage>;
//! type State = WorkerStartContext<(), ExampleMessage, ()>;
//! type Arguments = WorkerStartContext<(), ExampleMessage, ()>;
//! impl Worker for ExampleWorker {
//! type Key = ();
//! type Message = ExampleMessage;
//! type State = ();
//! type Arguments = ();
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! wid: WorkerId,
//! factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
//! startup_context: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! Ok(startup_context)
//! }
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! state: &mut Self::State,
//! wid: WorkerId,
//! factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
//! Job {msg, key, ..}: Job<(), ExampleMessage>,
//! _state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! WorkerMessage::FactoryPing(time) => {
//! // This is a message which all factory workers **must**
//! // adhere to. It is a background processing message from the
//! // factory which is used for (a) metrics and (b) detecting
//! // stuck workers, i.e. workers which aren't making progress
//! // processing their messages
//! state
//! .factory
//! .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", wid, msg);
//! match msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", wid);
//! }
//! WorkerMessage::Dispatch(job) => {
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", state.wid, job.msg);
//! match job.msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", state.wid);
//! }
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", state.wid);
//! let _ = reply.send(value);
//! }
//! }
//! // job finished, on success or err we report back to the factory
//! state
//! .factory
//! .cast(FactoryMessage::Finished(state.wid, job.key))?;
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", wid);
//! let _ = reply.send(value);
//! }
//! }
//! Ok(())
//! Ok(key)
//! }
//! }
//! /// Used by the factory to build new [ExampleWorker]s.
Expand Down Expand Up @@ -200,7 +185,7 @@ pub use factoryimpl::{Factory, FactoryArguments, FactoryArgumentsBuilder};
pub use job::{Job, JobKey, JobOptions, MessageRetryStrategy, RetriableMessage};
pub use lifecycle::FactoryLifecycleHooks;
pub use worker::{
DeadMansSwitchConfiguration, WorkerBuilder, WorkerCapacityController, WorkerMessage,
DeadMansSwitchConfiguration, Worker, WorkerBuilder, WorkerCapacityController, WorkerMessage,
WorkerProperties, WorkerStartContext,
};

Expand Down
10 changes: 2 additions & 8 deletions ractor/src/factory/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,7 @@ mod tests {
key: 99,
accepted: None,
msg: (),
options: JobOptions {
ttl: Some(Duration::from_millis(1)),
..Default::default()
},
options: JobOptions::new(Some(Duration::from_millis(1))),
});

let oldest = queue.discard_oldest();
Expand Down Expand Up @@ -480,10 +477,7 @@ mod tests {
key: 99,
accepted: None,
msg: (),
options: JobOptions {
ttl: Some(Duration::from_millis(1)),
..Default::default()
},
options: JobOptions::new(Some(Duration::from_millis(1))),
});

// should discard lowest pri first
Expand Down
46 changes: 18 additions & 28 deletions ractor/src/factory/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,37 @@ struct TestWorker {
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
type State = Self::Arguments;
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<TestKey, TestMessage>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
tracing::debug!("Worker received {:?}", job.msg);
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<TestKey, TestMessage>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<TestKey, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);

self.counter.fetch_add(1, Ordering::Relaxed);

if let Some(timeout_ms) = self.slow {
crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await;
}
self.counter.fetch_add(1, Ordering::Relaxed);

// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
if let Some(timeout_ms) = self.slow {
crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await;
}
Ok(())

Ok(key)
}
}

Expand Down
41 changes: 15 additions & 26 deletions ractor/src/factory/tests/dynamic_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,32 @@ struct TestWorker {
impl crate::Message for TestMessage {}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = Self::Arguments;
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
tracing::debug!("Worker received {:?}", job.msg);

self.id_map.insert(state.wid);

// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
}
Ok(())
wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<Self::Key, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);

self.id_map.insert(wid);
Ok(key)
}
}

Expand Down
Loading
Loading