Skip to content

Commit

Permalink
feat!(core/stream): move to new sources API
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 26, 2023
1 parent 047e7cd commit 481c28c
Show file tree
Hide file tree
Showing 17 changed files with 525 additions and 269 deletions.
5 changes: 2 additions & 3 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,8 @@ impl<C, K, S> Context<C, K, S> {
}
},
option = self.sources.next(), if !self.sources.is_empty() => {
// Sources cannot return `None` for now.
// TODO: check unicycle again.
let envelope = option.expect("source cannot return None");
// TODO: is it required if mailbox becomes source?
let envelope = ward!(option, continue);

if let Some(envelope) = self.post_recv(envelope) {
return Some(envelope);
Expand Down
26 changes: 13 additions & 13 deletions elfo-core/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ pub(crate) enum MessageKind {
}

impl<M> Envelope<M> {
pub(crate) fn new(message: M, kind: MessageKind) -> Self {
Self::with_trace_id(message, kind, crate::scope::trace_id())
}

pub(crate) fn with_trace_id(message: M, kind: MessageKind, trace_id: TraceId) -> Self {
Self {
created_time: Instant::now(),
trace_id,
kind,
message,
}
}

#[inline]
pub fn trace_id(&self) -> TraceId {
self.trace_id
Expand Down Expand Up @@ -59,19 +72,6 @@ impl<M> Envelope<M> {
}

impl<M: Message> Envelope<M> {
pub(crate) fn new(message: M, kind: MessageKind) -> Self {
Self::with_trace_id(message, kind, crate::scope::trace_id())
}

pub(crate) fn with_trace_id(message: M, kind: MessageKind, trace_id: TraceId) -> Self {
Self {
created_time: Instant::now(),
trace_id,
kind,
message,
}
}

pub(crate) fn upcast(self) -> Envelope {
Envelope {
created_time: self.created_time,
Expand Down
24 changes: 15 additions & 9 deletions elfo-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub struct Unattached<H> {
}

impl<H> Unattached<H> {
pub(crate) fn new(source: SourceArc<impl SourceStream>, handle: H) -> Self {
pub(crate) fn new(source: SourceArc<impl SourceStream + ?Sized>, handle: H) -> Self {
Self {
source: source.inner,
handle,
Expand All @@ -106,17 +106,22 @@ impl<H> Unattached<H> {

// === SourceArc ===

pub(crate) struct SourceArc<S> {
pub(crate) struct SourceArc<S: ?Sized> {
inner: UntypedSourceArc,
marker: PhantomData<S>,
}

impl<S: ?Sized> SourceArc<S> {
/// TODO
pub(crate) fn from_untyped(inner: UntypedSourceArc) -> Self {
let marker = PhantomData;
Self { inner, marker }
}
}

impl<S: SourceStream> SourceArc<S> {
pub(crate) fn new(source: S) -> Self {
Self {
inner: UntypedSourceArc::new(source),
marker: PhantomData,
}
Self::from_untyped(UntypedSourceArc::new(source))
}

pub(crate) fn lock(&self) -> SourceStreamGuard<'_, S> {
Expand All @@ -127,7 +132,7 @@ impl<S: SourceStream> SourceArc<S> {
}
}

impl<S> Clone for SourceArc<S> {
impl<S: ?Sized> Clone for SourceArc<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
Expand All @@ -138,7 +143,7 @@ impl<S> Clone for SourceArc<S> {

// === SourceStreamGuard ===

pub(crate) struct SourceStreamGuard<'a, S> {
pub(crate) struct SourceStreamGuard<'a, S: ?Sized> {
inner: MutexGuard<'a, StreamWithWaker<dyn SourceStream>>,
marker: PhantomData<S>,
}
Expand All @@ -163,6 +168,7 @@ impl<S: 'static> SourceStreamGuard<'_, S> {

// === UntypedSourceArc ===

// TODO: visibility.
#[derive(Clone)]
pub(crate) struct UntypedSourceArc(Arc<Mutex<StreamWithWaker<dyn SourceStream>>>);

Expand All @@ -172,7 +178,7 @@ struct StreamWithWaker<S: ?Sized> {
}

impl UntypedSourceArc {
fn new(stream: impl SourceStream) -> Self {
pub(crate) fn new(stream: impl SourceStream) -> Self {
Self(Arc::new(Mutex::new(StreamWithWaker {
waker: noop_waker(),
stream,
Expand Down
Loading

0 comments on commit 481c28c

Please sign in to comment.