Skip to content

Commit

Permalink
Add sys_promise table, to debug promise results. (#1533)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored May 21, 2024
1 parent ad2db81 commit 1d7ac91
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 1 deletion.
3 changes: 2 additions & 1 deletion crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ impl QueryContext {
crate::idempotency::register_self(
&ctx,
partition_selector.clone(),
partition_store_manager,
partition_store_manager.clone(),
)?;
crate::promise::register_self(&ctx, partition_selector.clone(), partition_store_manager)?;

let ctx = ctx
.datafusion_context
Expand Down
1 change: 1 addition & 0 deletions crates/storage-query-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod journal;
mod keyed_service_status;
mod partition_store_scanner;
mod physical_optimizer;
mod promise;
mod service;
mod state;
mod table_macro;
Expand Down
15 changes: 15 additions & 0 deletions crates/storage-query-datafusion/src/promise/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod row;
mod schema;
mod table;

pub(crate) use table::register_self;
55 changes: 55 additions & 0 deletions crates/storage-query-datafusion/src/promise/row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::schema::PromiseBuilder;

use crate::table_util::format_using;
use restate_storage_api::promise_table::{OwnedPromiseRow, PromiseState};
use restate_types::errors::InvocationError;
use restate_types::identifiers::WithPartitionKey;
use restate_types::journal::EntryResult;

#[inline]
pub(crate) fn append_promise_row(
builder: &mut PromiseBuilder,
output: &mut String,
owned_promise_row: OwnedPromiseRow,
) {
let mut row = builder.row();
row.partition_key(owned_promise_row.service_id.partition_key());

row.service_name(&owned_promise_row.service_id.service_name);
row.service_key(&owned_promise_row.service_id.key);
row.key(&owned_promise_row.key);

match owned_promise_row.metadata.state {
PromiseState::Completed(c) => {
row.completed(true);
match c {
EntryResult::Success(s) => {
row.completion_success_value(&s);
if row.is_completion_success_value_utf8_defined() {
if let Ok(str) = std::str::from_utf8(&s) {
row.completion_success_value_utf8(str);
}
}
}
EntryResult::Failure(c, m) => {
if row.is_completion_failure_defined() {
row.completion_failure(format_using(output, &InvocationError::new(c, m)))
}
}
}
}
PromiseState::NotCompleted(_) => {
row.completed(false);
}
}
}
29 changes: 29 additions & 0 deletions crates/storage-query-datafusion/src/promise/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#![allow(dead_code)]

use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(promise(
partition_key: DataType::UInt64,

service_name: DataType::LargeUtf8,
service_key: DataType::LargeUtf8,

key: DataType::LargeUtf8,
completed: DataType::Boolean,

completion_success_value: DataType::LargeBinary,
completion_success_value_utf8: DataType::LargeUtf8,
completion_failure: DataType::LargeUtf8
));
86 changes: 86 additions & 0 deletions crates/storage-query-datafusion/src/promise/table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::Debug;
use std::ops::RangeInclusive;
use std::sync::Arc;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::Sender;

use restate_partition_store::{PartitionStore, PartitionStoreManager};
use restate_storage_api::promise_table::{OwnedPromiseRow, ReadOnlyPromiseTable};
use restate_types::identifiers::PartitionKey;

use super::row::append_promise_row;
use super::schema::PromiseBuilder;
use crate::context::{QueryContext, SelectPartitions};
use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition};
use crate::table_providers::PartitionedTableProvider;

pub(crate) fn register_self(
ctx: &QueryContext,
partition_selector: impl SelectPartitions,
partition_store_manager: PartitionStoreManager,
) -> datafusion::common::Result<()> {
let table = PartitionedTableProvider::new(
partition_selector,
PromiseBuilder::schema(),
LocalPartitionsScanner::new(partition_store_manager, PromiseScanner),
);

ctx.as_ref()
.register_table("sys_promise", Arc::new(table))
.map(|_| ())
}

#[derive(Clone, Debug)]
struct PromiseScanner;

impl ScanLocalPartition for PromiseScanner {
async fn scan_partition_store(
mut partition_store: PartitionStore,
tx: Sender<Result<RecordBatch, datafusion::error::DataFusionError>>,
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
) {
for_each_state(projection, tx, partition_store.all_promises(range)).await;
}
}

async fn for_each_state(
schema: SchemaRef,
tx: Sender<datafusion::common::Result<RecordBatch>>,
rows: impl Stream<Item = restate_storage_api::Result<OwnedPromiseRow>>,
) {
let mut builder = PromiseBuilder::new(schema.clone());
let mut temp = String::new();

tokio::pin!(rows);
while let Some(Ok(owned_promise_row)) = rows.next().await {
append_promise_row(&mut builder, &mut temp, owned_promise_row);
if builder.full() {
let batch = builder.finish();
if tx.send(Ok(batch)).await.is_err() {
// not sure what to do here?
// the other side has hung up on us.
// we probably don't want to panic, is it will cause the entire process to exit
return;
}
builder = PromiseBuilder::new(schema.clone());
}
}
if !builder.empty() {
let result = builder.finish();
let _ = tx.send(Ok(result)).await;
}
}

0 comments on commit 1d7ac91

Please sign in to comment.