Skip to content

Revised Macro Refactor: Impl Block Support and Backward Compatibility (#43) #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ rust-version = "1.76.0"

[[example]]
name = "tracing"
path = "examples/tracing.rs"
path = "examples/trait_block/tracing.rs"
required-features = ["tracing-span-filter"]

[[example]]
name = "schema"
path = "examples/schema.rs"
path = "examples/trait_block/schema.rs"
required-features = ["schemars"]

[features]
Expand Down
40 changes: 40 additions & 0 deletions examples/impl_block/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use restate_sdk::prelude::*;

const COUNT: &str = "count";

struct Counter;

#[restate_sdk::object]
impl Counter {
#[handler(shared)]
async fn get(&self, ctx: SharedObjectContext<'_>) -> Result<u64, TerminalError> {
Ok(ctx.get::<u64>(COUNT).await?.unwrap_or(0))
}

#[handler]
async fn add(&self, ctx: ObjectContext<'_>, val: u64) -> Result<u64, TerminalError> {
let current = ctx.get::<u64>(COUNT).await?.unwrap_or(0);
let new = current + val;
ctx.set(COUNT, new);
Ok(new)
}

#[handler]
async fn increment(&self, ctx: ObjectContext<'_>) -> Result<u64, TerminalError> {
self.add(ctx, 1).await
}

#[handler]
async fn reset(&self, ctx: ObjectContext<'_>) -> Result<(), TerminalError> {
ctx.clear(COUNT);
Ok(())
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(Counter.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
84 changes: 84 additions & 0 deletions examples/impl_block/cron.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use restate_sdk::prelude::*;
use std::time::Duration;

/// This example shows how to implement a periodic task, by invoking itself in a loop.
///
/// The `start()` handler schedules the first call to `run()`, and then each `run()` will re-schedule itself.
///
/// To "break" the loop, we use a flag we persist in state, which is removed when `stop()` is invoked.
/// Its presence determines whether the task is active or not.
///
/// To start it:
///
/// ```shell
/// $ curl -v http://localhost:8080/PeriodicTask/my-periodic-task/start
/// ```
struct PeriodicTask;

const ACTIVE: &str = "active";

#[restate_sdk::object]
impl PeriodicTask {
#[handler]
async fn start(&self, context: ObjectContext<'_>) -> Result<(), TerminalError> {
if context
.get::<bool>(ACTIVE)
.await?
.is_some_and(|enabled| enabled)
{
// If it's already activated, just do nothing
return Ok(());
}

// Schedule the periodic task
PeriodicTask::schedule_next(&context);

// Mark the periodic task as active
context.set(ACTIVE, true);

Ok(())
}

#[handler]
async fn stop(&self, context: ObjectContext<'_>) -> Result<(), TerminalError> {
// Remove the active flag
context.clear(ACTIVE);

Ok(())
}

#[handler]
async fn run(&self, context: ObjectContext<'_>) -> Result<(), TerminalError> {
if context.get::<bool>(ACTIVE).await?.is_none() {
// Task is inactive, do nothing
return Ok(());
}

// --- Periodic task business logic!
println!("Triggered the periodic task!");

// Schedule the periodic task
PeriodicTask::schedule_next(&context);

Ok(())
}
}

impl PeriodicTask {
fn schedule_next(context: &ObjectContext<'_>) {
// To schedule, create a client to the callee handler (in this case, we're calling ourselves)
context
.object_client::<PeriodicTaskClient>(context.key())
.run()
// And send with a delay
.send_after(Duration::from_secs(10));
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(PeriodicTask.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
34 changes: 34 additions & 0 deletions examples/impl_block/failures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use rand::RngCore;
use restate_sdk::prelude::*;

#[derive(Debug, thiserror::Error)]
#[error("I'm very bad, retry me")]
struct MyError;

struct FailureExample;

#[restate_sdk::service]
impl FailureExample {
#[handler(name = "doRun")]
async fn do_run(&self, context: Context<'_>) -> Result<(), TerminalError> {
context
.run::<_, _, ()>(|| async move {
if rand::thread_rng().next_u32() % 4 == 0 {
Err(TerminalError::new("Failed!!!"))?
}

Err(MyError)?
})
.await?;

Ok(())
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(FailureExample.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
20 changes: 20 additions & 0 deletions examples/impl_block/greeter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use restate_sdk::prelude::*;
use std::convert::Infallible;

struct Greeter;

#[restate_sdk::service]
impl Greeter {
#[handler]
async fn greet(&self, _ctx: Context<'_>, name: String) -> Result<String, Infallible> {
Ok(format!("Greetings {name}"))
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(Greeter.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
44 changes: 44 additions & 0 deletions examples/impl_block/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use restate_sdk::prelude::*;
use std::collections::HashMap;

struct RunExample(reqwest::Client);

#[restate_sdk::service]
impl RunExample {
#[handler]
async fn do_run(
&self,
context: Context<'_>,
) -> Result<Json<HashMap<String, String>>, HandlerError> {
let res = context
.run(|| async move {
let req = self.0.get("https://httpbin.org/ip").build()?;

let res = self
.0
.execute(req)
.await?
.json::<HashMap<String, String>>()
.await?;

Ok(Json::from(res))
})
.name("get_ip")
.await?
.into_inner();

Ok(res.into())
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(
Endpoint::builder()
.bind(RunExample(reqwest::Client::new()).serve())
.build(),
)
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
File renamed without changes.
19 changes: 19 additions & 0 deletions examples/impl_block/services/my_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use restate_sdk::prelude::*;

pub struct MyService;

#[restate_sdk::service(vis = "pub(crate)")]
impl MyService {
#[handler]
async fn my_handler(&self, _ctx: Context<'_>, greeting: String) -> Result<String, HandlerError> {
Ok(format!("{greeting}!"))
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(MyService.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
36 changes: 36 additions & 0 deletions examples/impl_block/services/my_virtual_object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use restate_sdk::prelude::*;

pub struct MyVirtualObject;

#[restate_sdk::object(vis = "pub(crate)")]
impl MyVirtualObject {
#[handler]
async fn my_handler(
&self,
ctx: ObjectContext<'_>,
greeting: String,
) -> Result<String, HandlerError> {
Ok(format!("Greetings {} {}", greeting, ctx.key()))
}

#[handler(shared)]
async fn my_concurrent_handler(
&self,
ctx: SharedObjectContext<'_>,
greeting: String,
) -> Result<String, HandlerError> {
Ok(format!("Greetings {} {}", greeting, ctx.key()))
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(
Endpoint::builder()
.bind(MyVirtualObject.serve())
.build(),
)
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
32 changes: 32 additions & 0 deletions examples/impl_block/services/my_workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use restate_sdk::prelude::*;

pub struct MyWorkflow;

#[restate_sdk::workflow(vis = "pub(crate)")]
impl MyWorkflow {
#[handler]
async fn run(&self, _ctx: WorkflowContext<'_>, _req: String) -> Result<String, HandlerError> {
// implement workflow logic here

Ok(String::from("success"))
}

#[handler(shared)]
async fn interact_with_workflow(
&self,
_ctx: SharedWorkflowContext<'_>,
) -> Result<(), HandlerError> {
// implement interaction logic here
// e.g. resolve a promise that the workflow is waiting on

Ok(())
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(MyWorkflow.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
34 changes: 34 additions & 0 deletions examples/impl_block/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use restate_sdk::prelude::*;
use std::time::Duration;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};

struct Greeter;

#[restate_sdk::service]
impl Greeter {
#[handler]
async fn greet(&self, ctx: Context<'_>, name: String) -> Result<String, HandlerError> {
info!("Before sleep");
ctx.sleep(Duration::from_secs(61)).await?; // More than suspension timeout to trigger replay
info!("After sleep");
Ok(format!("Greetings {name}"))
}
}

#[tokio::main]
async fn main() {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,restate_sdk=debug".into());
let replay_filter = restate_sdk::filter::ReplayAwareFilter;
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_filter(env_filter)
.with_filter(replay_filter),
)
.init();
HttpServer::new(Endpoint::builder().bind(Greeter.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions examples/trait_block/services/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod my_service;
pub mod my_virtual_object;
pub mod my_workflow;
File renamed without changes.
Loading
Loading