Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
cleanup overseer a bit, make dummy work
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Apr 20, 2022
1 parent efb1d10 commit 462a93e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 63 deletions.
13 changes: 9 additions & 4 deletions node/overseer/overseer-gen/examples/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use std::collections::HashMap;
pub struct AwesomeSubSys;

impl ::polkadot_overseer_gen::Subsystem<AwesomeSubSysContext, Yikes> for AwesomeSubSys {
fn start(self, ctx: AwesomeSubSysContext) -> SpawnedSubsystem<Yikes> {
ctx.spawn("awesome", Box::pin(async move { ctx.send_message(Plinko).await }));
fn start(self, mut ctx: AwesomeSubSysContext) -> SpawnedSubsystem<Yikes> {
let mut sender = ctx.sender().clone();

ctx.spawn("awesome", Box::pin(async move { sender.send_message(Plinko).await }))
.unwrap();
unimplemented!("starting yay!")
}
}
Expand All @@ -19,8 +22,10 @@ impl ::polkadot_overseer_gen::Subsystem<AwesomeSubSysContext, Yikes> for Awesome
pub struct GoblinTower;

impl ::polkadot_overseer_gen::Subsystem<GoblinTowerContext, Yikes> for GoblinTower {
fn start(self, ctx: GoblinTowerContext) -> SpawnedSubsystem<Yikes> {
ctx.spawn("awesome", Box::pin(async move { ctx.send_message(MsgStrukt(0u8)).await }));
fn start(self, mut ctx: GoblinTowerContext) -> SpawnedSubsystem<Yikes> {
let mut sender = ctx.sender().clone();
ctx.spawn("awesome", Box::pin(async move { sender.send_message(MsgStrukt(0u8)).await }))
.unwrap();
unimplemented!("welcum")
}
}
Expand Down
3 changes: 3 additions & 0 deletions node/overseer/overseer-gen/proc-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ default = ["expand"]
# write the expanded version to a `overlord-expansion.[a-f0-9]{10}.rs`
# in the `OUT_DIR` as defined by `cargo` for the `expander` crate.
expand = []
# Create directional message consuming / outgoing graph.
# Generates: `${OUT_DIR}/${overseer|lowercase}-subsystem-messaging.dot`
graph = []
3 changes: 3 additions & 0 deletions node/overseer/overseer-gen/proc-macro/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
// populate OUT_DIR
}
88 changes: 29 additions & 59 deletions node/overseer/overseer-gen/proc-macro/src/impl_subsystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn graphviz(
edge.weight().get_ident().expect("Must have a trailing identifier. qed")
)
},
&|_graph, (node_index, subsystem_name)| -> String {
&|_graph, (_node_index, subsystem_name)| -> String {
format!(r#"label="{}""#, subsystem_name,)
},
);
Expand All @@ -54,7 +54,8 @@ fn graphviz(
pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result<TokenStream> {
let mut ts = TokenStream::new();

let span = info.overseer_name.span();
let overseer_name = &info.overseer_name;
let span = overseer_name.span();
let all_messages_wrapper = &info.message_wrapper;
let support_crate = info.support_crate_name();

Expand All @@ -81,21 +82,22 @@ pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result<TokenStream> {
consuming_lut.insert(&ssf.message_to_consume, (ssf.generic.clone(), node_index));
}

for (message_ty, (consuming_subsystem_ident, consuming_node_index)) in consuming_lut.iter() {
for (message_ty, (_consuming_subsystem_ident, consuming_node_index)) in consuming_lut.iter() {
// match the outgoing ones that were registered above with the consumed message
if let Some(origin_subsystems) = outgoing_lut.get(message_ty) {
for (origin_subsystem, sending_node_index) in origin_subsystems.iter() {
for (_origin_subsystem_ident, sending_node_index) in origin_subsystems.iter() {
graph.add_edge(*sending_node_index, *consuming_node_index, (*message_ty).clone());
}
}
}

// all outgoing edges are now usable to deriver everything we need
// All outgoing edges are now usable to derive everything we need
for node_index in graph.node_indices() {
let subsystem_name = graph[node_index].to_string();
let outgoing_wrapper = Ident::new(&(subsystem_name + "OutgoingMessages"), span);

// cannot be a hashmap, duplicate keys and sorting required
// maps outgoing messages to the subsystem that consumes it
let outgoing_to_consumer = graph
.edges_directed(node_index, Direction::Outgoing)
.map(|edge| {
Expand All @@ -105,6 +107,7 @@ pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result<TokenStream> {
})
.collect::<Result<Vec<(Ident, Ident)>>>()?;

// Split it for usage with quote
let outgoing_variant = outgoing_to_consumer.iter().map(|x| x.0.clone()).collect::<Vec<_>>();
let subsystem_generic = outgoing_to_consumer.into_iter().map(|x| x.1).collect::<Vec<_>>();

Expand All @@ -121,32 +124,37 @@ pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result<TokenStream> {
})
}

if true {
let mut f = std::fs::OpenOptions::new()
// Dump the graph to file.
if cfg!(feature = "graph") {
let path = std::path::PathBuf::from(
std::env::var("OUT_DIR").expect("build.rs exists, hence OUT_DIR is populated. qed"),
)
.join(overseer_name.to_string().to_lowercase() + "-subsystem-messaging.dot");
if let Err(e) = std::fs::OpenOptions::new()
.truncate(true)
.create(true)
.write(true)
.open("/tmp/foo.dot")
.expect("Opening that file works. qed");
graphviz(&graph, &mut f).expect("Write of dot graph should work. qed");
.open(&path)
.and_then(|mut f| graphviz(&graph, &mut f))
{
eprintln!("Failed to write dot graph to {}: {:?}", path.display(), e);
}
}

// Create all subsystem specific types, one by one
for ssf in info.subsystems() {
let subsystem_name = ssf.generic.to_string();
let subsystem_sender_name = Ident::new(&(subsystem_name.clone() + "Sender"), span);
let subsystem_ctx_name = Ident::new(&(subsystem_name.clone() + "Context"), span);

let outgoing_wrapper = Ident::new(&(subsystem_name.clone() + "OutgoingMessages"), span);
let consumes = ssf
.message_to_consume
.get_ident()
.cloned()
.unwrap_or_else(|| Ident::new("()", Span::call_site()));
let consumes = ssf.message_to_consume.get_ident().cloned().expect(
"Consumable message must have an trailing Ident which was checked in `Parse`. qed",
);

ts.extend(impl_wrapper_enum(&outgoing_wrapper, ssf.messages_to_send.as_slice())?);

ts.extend(impl_subsystem_sender(
ssf,
&all_messages_wrapper,
support_crate,
&outgoing_wrapper,
Expand All @@ -164,6 +172,7 @@ pub(crate) fn impl_subsystem(info: &OverseerInfo) -> Result<TokenStream> {
Ok(ts)
}

/// Extract the final component of the message type path as used in the `#[subsystem(consumes: path::to::Foo)]` annotation.
fn to_variant(path: &Path, span: Span) -> Result<Ident> {
let ident = path
.segments
Expand Down Expand Up @@ -205,46 +214,7 @@ pub(crate) fn impl_wrapper_enum(wrapper: &Ident, message_types: &[Path]) -> Resu
Ok(ts)
}

/// Generates the wrapper type enum, no bells or whistles.
pub(crate) fn impl_wrapper_to_wrapper_glue(
all_message: &Ident,
wrapper: &Ident,
message_types: &[Path],
) -> Result<TokenStream> {
// The message types are path based, each of them must finish with a type
// and as such we do this upfront.
let variants: Vec<_> = Result::from_iter(message_types.into_iter().map(|path| {
let ident = path
.segments
.last()
.ok_or_else(|| {
syn::Error::new(wrapper.span(), "Path is empty, but it must end with an identifier")
})
.map(|segment| segment.ident.clone());
ident
}))?;
let ts = quote! {
#[allow(missing_docs)]
#[derive(Clone)]
pub enum #wrapper {
#(
#variants ( #message_types ),
)*
}

#(
impl ::std::convert::From< #message_types > for #wrapper {
fn from(message: #message_types) -> Self {
#wrapper :: #variants ( message )
}
}
)*
};
Ok(ts)
}

pub(crate) fn impl_subsystem_sender(
ssf: &SubSysField,
wrapper_message: &Ident,
support_crate: &TokenStream,
outgoing_wrapper: &Ident,
Expand Down Expand Up @@ -274,8 +244,8 @@ pub(crate) fn impl_subsystem_sender(
impl<Message> SubsystemSender< Message > for #subsystem_sender_name
where
Message: Send + 'static,
#outgoing_wrapper: ::std::convert::From<Message>,
#all_messages_wrapper: ::std::convert::From<Message>,
#outgoing_wrapper: ::std::convert::From<Message> + Send,
#all_messages_wrapper: ::std::convert::From<#outgoing_wrapper> + Send,
{
async fn send_message(&mut self, msg: Message) {
self.channels.send_and_log_error(
Expand All @@ -289,10 +259,10 @@ pub(crate) fn impl_subsystem_sender(
async fn send_messages<I>(&mut self, msgs: I)
where
I: IntoIterator<Item=Message> + Send,
I::IntoIter: Iterator<Item=Message>,
I::IntoIter: Iterator<Item=Message> + Send,
{
for msg in msgs {
self.send_message( msg );
self.send_message( msg ).await;
}
}

Expand Down

0 comments on commit 462a93e

Please sign in to comment.