Skip to content

Commit

Permalink
Set up Tokio runtime in main()
Browse files Browse the repository at this point in the history
Each entrypoint to the container bits sets up a tokio runtime,
which is inefficient and duplicative.  We're also likely
to start using Rust async in more places.

Instead, create a Tokio runtime early in our `main`, and
change the CLI entrypoint to be an `async fn`.

The other setup of a runtime we have is deep inside the
sysroot upgrader bits, also for the container.  In this
case we actually have another thread (distinct from the main one
where we set up Tokio) created by C/C++, so we need to pass
a `tokio::runtime::Handle` across, and call `enter()` on it
to set up the thread local bindings to access tokio async
from there.

I was initially looking at properly handling `GCancellable`
with tokio and wanted to clean this up first.
  • Loading branch information
cgwalters committed Aug 20, 2021
1 parent 1c90027 commit 2b05cbc
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 35 deletions.
12 changes: 4 additions & 8 deletions rust/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

// SPDX-License-Identifier: Apache-2.0 OR MIT

use anyhow::{Context, Result};
use anyhow::Result;

/// Main entrypoint for container
pub fn entrypoint(args: &[&str]) -> Result<()> {
pub async fn entrypoint(args: &[&str]) -> Result<i32> {
// Right now we're only exporting the `container` bits, not tar. So inject that argument.
// And we also need to skip the main arg and the `ex-container` arg.
let args = ["rpm-ostree", "container"]
.iter()
.chain(args.iter().skip(2));
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Failed to build tokio runtime")?
.block_on(async { ostree_ext::cli::run_from_iter(args).await })?;
Ok(())
ostree_ext::cli::run_from_iter(args).await?;
Ok(0)
}
11 changes: 11 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ pub mod ffi {
fn modularity_entrypoint(args: &Vec<String>) -> Result<()>;
}

// tokio_ffi.rs
extern "Rust" {
type TokioHandle;
type TokioEnterGuard<'a>;

fn tokio_handle_get() -> Box<TokioHandle>;
unsafe fn enter<'a>(self: &'a TokioHandle) -> Box<TokioEnterGuard<'a>>;
}

// scripts.rs
extern "Rust" {
fn script_is_ignored(pkg: &str, script: &str) -> bool;
Expand Down Expand Up @@ -644,6 +653,8 @@ use passwd::*;
mod console_progress;
pub(crate) use self::console_progress::*;
mod progress;
mod tokio_ffi;
pub(crate) use self::tokio_ffi::*;
mod scripts;
pub(crate) use self::scripts::*;
mod sysroot_upgrade;
Expand Down
43 changes: 26 additions & 17 deletions rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ fn usroverlay(args: &[&str]) -> Result<()> {
.context("Failed to execute ostree admin unlock")
}

// And now we've done process global initialization, we have a tokio runtime setup; process the command line.
async fn inner_async_main(args: &[&str]) -> Result<i32> {
// It is only recently that our main() function is in Rust, calling
// into C++ as a library. As of right now, the only Rust commands
// are hidden, i.e. should not appear in --help. So we just recognize
// those, and if there's something we don't know about, invoke the C++
// main().
match args.get(1).copied() {
// Add custom Rust commands here, and also in `libmain.cxx` if user-visible.
Some("countme") => rpmostree_rust::countme::entrypoint(&args).map(|_| 0),
Some("cliwrap") => rpmostree_rust::cliwrap::entrypoint(&args).map(|_| 0),
Some("ex-container") => rpmostree_rust::container::entrypoint(&args).await,
// The `unlock` is a hidden alias for "ostree CLI compatibility"
Some("usroverlay") | Some("unlock") => usroverlay(&args).map(|_| 0),
_ => {
// Otherwise fall through to C++ main().
Ok(rpmostree_rust::ffi::rpmostree_main(&args)?)
}
}
}

/// The real main function returns a `Result<>`.
fn inner_main() -> Result<i32> {
if std::env::var("RPMOSTREE_GDB_HOOK").is_ok() {
Expand Down Expand Up @@ -54,23 +75,11 @@ fn inner_main() -> Result<i32> {
.collect();
let args = args?;
let args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
// It is only recently that our main() function is in Rust, calling
// into C++ as a library. As of right now, the only Rust commands
// are hidden, i.e. should not appear in --help. So we just recognize
// those, and if there's something we don't know about, invoke the C++
// main().
match args.get(1).copied() {
// Add custom Rust commands here, and also in `libmain.cxx` if user-visible.
Some("countme") => rpmostree_rust::countme::entrypoint(&args).map(|_| 0),
Some("cliwrap") => rpmostree_rust::cliwrap::entrypoint(&args).map(|_| 0),
Some("ex-container") => rpmostree_rust::container::entrypoint(&args).map(|_| 0),
// The `unlock` is a hidden alias for "ostree CLI compatibility"
Some("usroverlay") | Some("unlock") => usroverlay(&args).map(|_| 0),
_ => {
// Otherwise fall through to C++ main().
Ok(rpmostree_rust::ffi::rpmostree_main(&args)?)
}
}
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Failed to build tokio runtime")?
.block_on(async { inner_async_main(&args).await })
}

fn print_error(e: anyhow::Error) {
Expand Down
13 changes: 3 additions & 10 deletions rust/src/sysroot_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use crate::cxxrsutil::*;
use crate::ffi::ContainerImport;
use anyhow::{Context, Result};
use std::convert::TryInto;
use std::pin::Pin;
use tokio::runtime::Handle;

/// Import ostree commit in container image using ostree-rs-ext's API.
pub(crate) fn import_container(
Expand All @@ -16,7 +16,7 @@ pub(crate) fn import_container(
// TODO: take a GCancellable and monitor it, and drop the import task (which is how async cancellation works in Rust).
let repo = repo.gobj_wrap();
let imgref = imgref.as_str().try_into()?;
let imported = build_runtime()?
let imported = Handle::current()
.block_on(async { ostree_ext::container::import(&repo, &imgref, None).await })?;
Ok(Box::new(ContainerImport {
ostree_commit: imported.ostree_commit,
Expand All @@ -27,14 +27,7 @@ pub(crate) fn import_container(
/// Fetch the image digest for `imgref` using ostree-rs-ext's API.
pub(crate) fn fetch_digest(imgref: String) -> CxxResult<String> {
let imgref = imgref.as_str().try_into()?;
let digest = build_runtime()?
let digest = Handle::current()
.block_on(async { ostree_ext::container::fetch_manifest_info(&imgref).await })?;
Ok(digest.manifest_digest)
}

fn build_runtime() -> Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Failed to build tokio runtime")
}
16 changes: 16 additions & 0 deletions rust/src/tokio_ffi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Helpers to bridge tokio to C++

// SPDX-License-Identifier: Apache-2.0 OR MIT

pub(crate) struct TokioHandle(tokio::runtime::Handle);
pub(crate) struct TokioEnterGuard<'a>(tokio::runtime::EnterGuard<'a>);

pub(crate) fn tokio_handle_get() -> Box<TokioHandle> {
Box::new(TokioHandle(tokio::runtime::Handle::current()))
}

impl TokioHandle {
pub(crate) fn enter(&self) -> Box<TokioEnterGuard> {
Box::new(TokioEnterGuard(self.0.enter()))
}
}
9 changes: 9 additions & 0 deletions src/daemon/rpmostreed-transaction.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <systemd/sd-journal.h>
#include <systemd/sd-login.h>
#include <stdexcept>
#include <optional>

#include "rpmostreed-transaction.h"
#include "rpmostreed-errors.h"
Expand All @@ -47,6 +48,8 @@ struct _RpmostreedTransactionPrivate {
char *agent_id;
char *sd_unit;

std::optional<rust::Box<rpmostreecxx::TokioHandle>> tokio_handle;

gint64 last_progress_journal;

gboolean redirect_output;
Expand Down Expand Up @@ -341,6 +344,8 @@ transaction_execute_thread (GTask *task,
* anyways.
*/
g_main_context_push_thread_default (mctx);
// Further, we join the main Tokio async runtime.
auto guard = (*priv->tokio_handle)->enter();

if (clazz->execute != NULL)
{
Expand Down Expand Up @@ -512,6 +517,8 @@ transaction_finalize (GObject *object)
if (priv->watch_id > 0)
g_bus_unwatch_name (priv->watch_id);

priv->tokio_handle.~optional();

g_hash_table_destroy (priv->peer_connections);

g_free (priv->client_description);
Expand Down Expand Up @@ -823,6 +830,8 @@ rpmostreed_transaction_init (RpmostreedTransaction *self)
g_direct_equal,
g_object_unref,
NULL);

self->priv->tokio_handle = rpmostreecxx::tokio_handle_get();
}

gboolean
Expand Down

0 comments on commit 2b05cbc

Please sign in to comment.