Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mz_adapter_types::connection::ConnectionId;
use mz_ore::cast::CastInto;
use mz_persist_client::batch::ProtoBatch;
use mz_pgcopy::CopyFormatParams;
use mz_repr::{CatalogItemId, Datum, RowArena};
use mz_repr::{CatalogItemId, Datum, NotNullViolation, RowArena};
use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::client::TableData;
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Coordinator {
CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format")));
ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
return;
}
};
Expand Down Expand Up @@ -160,6 +160,39 @@ impl Coordinator {
})
});
let source_mfp = return_if_err!(source_mfp, ctx);

// Validate that all non-nullable columns in the target table will be populated.
let target_desc = dest_table.desc.latest();
for (col_idx, col_type) in target_desc.iter_types().enumerate() {
if !col_type.nullable {
// Check what value the MFP will produce for this column position.
if let Some(&projection_idx) = source_mfp.projection.get(col_idx) {
// If the projection index is beyond the input arity, it references an expression.
let input_arity = source_mfp.input_arity;
if projection_idx >= input_arity {
let expr_idx = projection_idx - input_arity;
if let Some(expr) = source_mfp.expressions.get(expr_idx) {
// Check if the expression is a NULL literal.
// A NULL literal is represented as Literal(Ok(empty_row), _)
if matches!(expr, mz_expr::MirScalarExpr::Literal(Ok(row), _) if row.iter().next().map(|d| d.is_null()).unwrap_or(false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just checks for a null literal, but there can be a more complicated expression in the tables default for the column that evaluates to null. For example, if I modify t1_not_null's definition in copy-from-s3-minio.td to the following, then we still crash:

> CREATE TABLE t1_not_null (a text, b int, c float NOT NULL DEFAULT 5 + null);

We could maybe do constant folding on the expression. At the HIR level, this can be achieved by calling simplify_to_literal. There is an MIR expression here, not an HIR expression, but I think this validation could be done during HIR planning instead of here in sequencing, see my later comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this is true, but validation during HIR planning is a bit tricky as mentioned in my other reply. It does seem that maybe this should be done at actual decoding time, but the error handling there doesn't really allow for non-fatal errors as of now, so it would take a bit of a redesign. Is there a good way to do constant folding on a MIR expression?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MIR, you can do .reduce(&[]).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem that maybe this should be done at actual decoding time, but the error handling there doesn't really allow for non-fatal errors as of now, so it would take a bit of a redesign.

Well, solving this seems unavoidable, as far as I can see. Seems like quite a serious problem if we just crash when there is a null in the input data in the wrong column. But I'm a bit out of my expertise at this point, so you might want to consult someone on the Sources & Sinks Team.

{
let col_name = target_desc.get_name(col_idx);
return ctx.retire(Err(AdapterError::ConstraintViolation(
NotNullViolation(col_name.clone()),
)));
}
}
}
} else {
// If there's no projection for this column, that's a validation error
let col_name = target_desc.get_name(col_idx);
return ctx.retire(Err(AdapterError::ConstraintViolation(NotNullViolation(
col_name.clone(),
))));
}
}
}

let shape = ContentShape {
source_desc,
source_mfp,
Expand Down
22 changes: 18 additions & 4 deletions src/storage-operators/src/oneshot_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use mz_ore::cast::CastFrom;
use mz_persist_client::Diagnostics;
use mz_persist_client::batch::ProtoBatch;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_types::Codec;
use mz_persist_types::codec_impls::UnitSchema;
use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
use mz_storage_types::StorageDiff;
Expand Down Expand Up @@ -521,7 +522,7 @@ where
{
let persist_location = collection_meta.persist_location.clone();
let shard_id = collection_meta.data_shard;
let collection_desc = collection_meta.relation_desc.clone();
let collection_desc = Arc::new(collection_meta.relation_desc.clone());

let mut builder =
AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
Expand All @@ -545,7 +546,7 @@ where
let write_handle = persist_client
.open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
shard_id,
Arc::new(collection_desc),
Arc::clone(&collection_desc),
Arc::new(UnitSchema),
persist_diagnostics,
)
Expand All @@ -568,8 +569,8 @@ where
// Pull Rows off our stream and stage them into a Batch.
for maybe_row in row_batch {
match maybe_row {
// Happy path, add the Row to our batch!
Ok(row) => {
// Happy path, add the Row to our batch !
Ok(row) if Row::validate(&row, &*collection_desc).is_ok() => {
let data = SourceData(Ok(row));
batch_builder
.add(&data, &(), &lower, &1)
Expand All @@ -590,6 +591,19 @@ where
.give(&proto_batch_cap, Err(err).context("stage batches"));
return;
}
_ => {
let err =
StorageErrorXKind::invalid_record_batch("invalid row for collection");
// Clean up our in-progress batch so we don't leak data.
let batch = batch_builder
.finish(upper)
.await
.expect("failed to cleanup batch");
batch.delete().await;
proto_batch_handle
.give(&proto_batch_cap, Err(err).context("stage batches"));
return;
}
}
}
}
Expand Down
37 changes: 34 additions & 3 deletions test/testdrive/copy-from-s3-minio.td
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,40 @@ contains:failed to decode Row
! COPY INTO t1_single_col FROM '${1_csv_url}' (FORMAT CSV);
contains:wrong number of columns

# disabled as MZ panics database-issues#9886
# ! COPY INTO t1_not_null FROM '${1_csv_url}' (FORMAT CSV);
# contains:some error that's not a panic
# Cannot perform this COPY because column c is NOT NULL, has no default value, and is not being copied into.
! COPY INTO t1_not_null (a, b) FROM '${1_csv_url}' (FORMAT CSV);
contains:violates not-null constraint

# The last row of this csv contains a NULL value in the third column
$ s3-file-upload bucket=copytos3 key=csv2/nulls.csv repeat=1
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,1.0
foo,100,

$ s3-set-presigned-url bucket=copytos3 key=csv2/nulls.csv var-name=nulls_csv_url

# The final row with the null value in "column c" should cause an error as c is NOT NULL
! COPY INTO t1_not_null (a, b, c) FROM '${nulls_csv_url}' (FORMAT CSV);
contains:invalid row for collection

# Make sure that the entire transaction is rejected
> SELECT COUNT(*) FROM t1_not_null;
0

> COPY INTO t1_fun_house (z, y) FROM '${1_csv_url}' (FORMAT CSV);
> SELECT * from t1_fun_house;
Expand Down