Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up Tokio runtime in main() #3082

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions rust/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

// SPDX-License-Identifier: Apache-2.0 OR MIT

use anyhow::{Context, Result};
use anyhow::Result;

/// Main entrypoint for container
pub fn entrypoint(args: &[&str]) -> Result<()> {
pub async fn entrypoint(args: &[&str]) -> Result<i32> {
// Right now we're only exporting the `container` bits, not tar. So inject that argument.
// And we also need to skip the main arg and the `ex-container` arg.
let args = ["rpm-ostree", "container"]
.iter()
.chain(args.iter().skip(2));
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Failed to build tokio runtime")?
.block_on(async { ostree_ext::cli::run_from_iter(args).await })?;
Ok(())
ostree_ext::cli::run_from_iter(args).await?;
Ok(0)
}
11 changes: 11 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ pub mod ffi {
fn modularity_entrypoint(args: &Vec<String>) -> Result<()>;
}

// tokio_ffi.rs
extern "Rust" {
type TokioHandle;
type TokioEnterGuard<'a>;

fn tokio_handle_get() -> Box<TokioHandle>;
unsafe fn enter<'a>(self: &'a TokioHandle) -> Box<TokioEnterGuard<'a>>;
}

// scripts.rs
extern "Rust" {
fn script_is_ignored(pkg: &str, script: &str) -> bool;
Expand Down Expand Up @@ -644,6 +653,8 @@ use passwd::*;
mod console_progress;
pub(crate) use self::console_progress::*;
mod progress;
mod tokio_ffi;
pub(crate) use self::tokio_ffi::*;
mod scripts;
pub(crate) use self::scripts::*;
mod sysroot_upgrade;
Expand Down
43 changes: 26 additions & 17 deletions rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ fn usroverlay(args: &[&str]) -> Result<()> {
.context("Failed to execute ostree admin unlock")
}

// And now we've done process global initialization, we have a tokio runtime setup; process the command line.
async fn inner_async_main(args: &[&str]) -> Result<i32> {
// It is only recently that our main() function is in Rust, calling
// into C++ as a library. As of right now, the only Rust commands
// are hidden, i.e. should not appear in --help. So we just recognize
// those, and if there's something we don't know about, invoke the C++
// main().
match args.get(1).copied() {
// Add custom Rust commands here, and also in `libmain.cxx` if user-visible.
Some("countme") => rpmostree_rust::countme::entrypoint(&args).map(|_| 0),
Some("cliwrap") => rpmostree_rust::cliwrap::entrypoint(&args).map(|_| 0),
Some("ex-container") => rpmostree_rust::container::entrypoint(&args).await,
// The `unlock` is a hidden alias for "ostree CLI compatibility"
Some("usroverlay") | Some("unlock") => usroverlay(&args).map(|_| 0),
_ => {
// Otherwise fall through to C++ main().
Ok(rpmostree_rust::ffi::rpmostree_main(&args)?)
}
}
}

/// The real main function returns a `Result<>`.
fn inner_main() -> Result<i32> {
if std::env::var("RPMOSTREE_GDB_HOOK").is_ok() {
Expand Down Expand Up @@ -54,23 +75,11 @@ fn inner_main() -> Result<i32> {
.collect();
let args = args?;
let args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
// It is only recently that our main() function is in Rust, calling
// into C++ as a library. As of right now, the only Rust commands
// are hidden, i.e. should not appear in --help. So we just recognize
// those, and if there's something we don't know about, invoke the C++
// main().
match args.get(1).copied() {
// Add custom Rust commands here, and also in `libmain.cxx` if user-visible.
Some("countme") => rpmostree_rust::countme::entrypoint(&args).map(|_| 0),
Some("cliwrap") => rpmostree_rust::cliwrap::entrypoint(&args).map(|_| 0),
Some("ex-container") => rpmostree_rust::container::entrypoint(&args).map(|_| 0),
// The `unlock` is a hidden alias for "ostree CLI compatibility"
Some("usroverlay") | Some("unlock") => usroverlay(&args).map(|_| 0),
_ => {
// Otherwise fall through to C++ main().
Ok(rpmostree_rust::ffi::rpmostree_main(&args)?)
}
}
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Failed to build tokio runtime")?
.block_on(async { inner_async_main(&args).await })
}

fn print_error(e: anyhow::Error) {
Expand Down
13 changes: 3 additions & 10 deletions rust/src/sysroot_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use crate::cxxrsutil::*;
use crate::ffi::ContainerImport;
use anyhow::{Context, Result};
use std::convert::TryInto;
use std::pin::Pin;
use tokio::runtime::Handle;

/// Import ostree commit in container image using ostree-rs-ext's API.
pub(crate) fn import_container(
Expand All @@ -16,7 +16,7 @@ pub(crate) fn import_container(
// TODO: take a GCancellable and monitor it, and drop the import task (which is how async cancellation works in Rust).
let repo = repo.gobj_wrap();
let imgref = imgref.as_str().try_into()?;
let imported = build_runtime()?
let imported = Handle::current()
Copy link
Contributor

@lucab lucab Aug 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have misunderstood the rest of the FFI changes around. I would expect this function and the one below to take a TokioEnterGuard (or maybe directly an Handle) to make sure they are always called beneath a tokio runtime.
Did I misunderstand the surrounding changes on the C++ side, or did you just forget to plumb a new input argument here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could plumb through the Handle but I think this is a fine pattern; Handle::current() will panic if not called on a tokio runtime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, I forsee us using more async on the Rust side, and it'd be annoying to have to thread another argument through all the API calls to do it. Tokio already has the runtime in a thread-local for precisely this kind of thing (AIUI). It's very unlikely we somehow fail to set up the runtime on the worker thread. (And eventually, I think we'll move the worker thread to be spawned by Rust, which would solve this problem too)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I thought you exposed the tokio_handle_get() in order to move from a runtime panic to a compiler-enforced safety net.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's more from the other side, we need it because the C++ side needs to hold onto the Handle reference in order to pass it to the new C++ thread and install it there.

.block_on(async { ostree_ext::container::import(&repo, &imgref, None).await })?;
Ok(Box::new(ContainerImport {
ostree_commit: imported.ostree_commit,
Expand All @@ -27,14 +27,7 @@ pub(crate) fn import_container(
/// Fetch the image digest for `imgref` using ostree-rs-ext's API.
pub(crate) fn fetch_digest(imgref: String) -> CxxResult<String> {
let imgref = imgref.as_str().try_into()?;
let digest = build_runtime()?
let digest = Handle::current()
.block_on(async { ostree_ext::container::fetch_manifest_info(&imgref).await })?;
Ok(digest.manifest_digest)
}

fn build_runtime() -> Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Failed to build tokio runtime")
}
16 changes: 16 additions & 0 deletions rust/src/tokio_ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Helpers to bridge tokio to C++

// SPDX-License-Identifier: Apache-2.0 OR MIT

pub(crate) struct TokioHandle(tokio::runtime::Handle);
pub(crate) struct TokioEnterGuard<'a>(tokio::runtime::EnterGuard<'a>);

pub(crate) fn tokio_handle_get() -> Box<TokioHandle> {
Box::new(TokioHandle(tokio::runtime::Handle::current()))
}

impl TokioHandle {
pub(crate) fn enter(&self) -> Box<TokioEnterGuard> {
Box::new(TokioEnterGuard(self.0.enter()))
}
}
9 changes: 9 additions & 0 deletions src/daemon/rpmostreed-transaction.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <systemd/sd-journal.h>
#include <systemd/sd-login.h>
#include <stdexcept>
#include <optional>

#include "rpmostreed-transaction.h"
#include "rpmostreed-errors.h"
Expand All @@ -47,6 +48,8 @@ struct _RpmostreedTransactionPrivate {
char *agent_id;
char *sd_unit;

std::optional<rust::Box<rpmostreecxx::TokioHandle>> tokio_handle;

gint64 last_progress_journal;

gboolean redirect_output;
Expand Down Expand Up @@ -341,6 +344,8 @@ transaction_execute_thread (GTask *task,
* anyways.
*/
g_main_context_push_thread_default (mctx);
// Further, we join the main Tokio async runtime.
auto guard = (*priv->tokio_handle)->enter();

if (clazz->execute != NULL)
{
Expand Down Expand Up @@ -512,6 +517,8 @@ transaction_finalize (GObject *object)
if (priv->watch_id > 0)
g_bus_unwatch_name (priv->watch_id);

priv->tokio_handle.~optional();

g_hash_table_destroy (priv->peer_connections);

g_free (priv->client_description);
Expand Down Expand Up @@ -823,6 +830,8 @@ rpmostreed_transaction_init (RpmostreedTransaction *self)
g_direct_equal,
g_object_unref,
NULL);

self->priv->tokio_handle = rpmostreecxx::tokio_handle_get();
}

gboolean
Expand Down