Skip to content

Commit

Permalink
DataflowError backward compat
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Sep 21, 2023
1 parent f176f13 commit b57dcde
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/storage-types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ fn main() {
"storage-types/src/parameters.proto",
"storage-types/src/sinks.proto",
"storage-types/src/sources.proto",
"storage-types/src/sources_legacy.proto",
"storage-types/src/sources/encoding.proto",
],
&[".."],
Expand Down
1 change: 1 addition & 0 deletions src/storage-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ pub mod instances;
pub mod parameters;
pub mod sinks;
pub mod sources;
pub mod sources_legacy;
15 changes: 7 additions & 8 deletions src/storage-types/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::instances::StorageInstanceId;
use crate::sources::encoding::{DataEncoding, DataEncodingInner, SourceDataEncoding};
use crate::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport};
use crate::sources::proto_load_generator_source_connection::Generator as ProtoGenerator;
use crate::sources_legacy::{decode_dataflow_error_with_fallback, decode_source_data_with_fallback};

pub mod encoding;

Expand Down Expand Up @@ -2681,14 +2682,14 @@ impl Codec for SourceData {
}

fn encode<B: BufMut>(&self, buf: &mut B) {
self.into_proto()
let proto: ProtoSourceData = self.into_proto();
proto
.encode(buf)
.expect("no required fields means no initialization errors");
}

fn decode(buf: &[u8]) -> Result<Self, String> {
let proto = ProtoSourceData::decode(buf).map_err(|err| err.to_string())?;
proto.into_rust().map_err(|err| err.to_string())
decode_source_data_with_fallback(buf)
}
}

Expand Down Expand Up @@ -2722,7 +2723,8 @@ impl<'a> PartEncoder<'a, SourceData> for SourceDataEncoder<'a> {
for encoder in self.ok.col_encoders() {
encoder.encode_default();
}
let err = err.into_proto().encode_to_vec();
let err: ProtoDataflowError = err.into_proto();
let err = err.encode_to_vec();
ColumnPush::<Option<Vec<u8>>>::push(self.err, Some(err.as_slice()));
}
}
Expand Down Expand Up @@ -2757,10 +2759,7 @@ impl<'a> PartDecoder<'a, SourceData> for SourceDataDecoder<'a> {
}
}
(false, Some(err)) => {
let err = ProtoDataflowError::decode(err)
.expect("proto should be valid")
.into_rust()
.expect("error should be valid");
let err = decode_dataflow_error_with_fallback(err).expect("proto should be valid");
val.0 = Err(err);
}
(true, Some(_)) | (false, None) => {
Expand Down
53 changes: 53 additions & 0 deletions src/storage-types/src/sources_legacy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

syntax = "proto3";

import "expr/src/scalar.proto";
import "repr/src/row.proto";
import "storage-types/src/errors.proto";

package mz_storage_types.sources_legacy;

message ProtoSourceDataLegacy {
oneof kind {
mz_repr.row.ProtoRow ok = 1;
ProtoDataflowErrorLegacy err = 2;
}
}

message ProtoDataflowErrorLegacy {
oneof kind {
errors.ProtoDecodeError decode_error = 1;
mz_expr.scalar.ProtoEvalError eval_error = 2;
errors.ProtoSourceError source_error = 3;
ProtoEnvelopeErrorV1Legacy envelope_error_v1 = 4;
}
}

message ProtoEnvelopeErrorV1Legacy {
oneof kind {
string debezium = 1;
ProtoUpsertErrorLegacy upsert = 2;
string flat = 3;
}
}

message ProtoUpsertErrorLegacy {
oneof kind {
errors.ProtoDecodeError key_decode = 1;
ProtoUpsertValueErrorLegacy value = 2;
errors.ProtoUpsertNullKeyError null_key = 3;
}
}

message ProtoUpsertValueErrorLegacy {
ProtoDataflowErrorLegacy inner = 1;
mz_repr.row.ProtoRow for_key = 2;
}
201 changes: 201 additions & 0 deletions src/storage-types/src/sources_legacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use mz_proto::{ProtoType, RustType, TryFromProtoError};
use prost::Message;

use crate::errors::{
DataflowError, EnvelopeError, ProtoDataflowError, UpsertError, UpsertValueError,
};
use crate::sources::{ProtoSourceData, SourceData};

include!(concat!(
env!("OUT_DIR"),
"/mz_storage_types.sources_legacy.rs"
));

pub(crate) fn decode_source_data_with_fallback(buf: &[u8]) -> Result<SourceData, String> {
match ProtoSourceData::decode(buf) {
Ok(proto) => proto.into_rust().map_err(|err| err.to_string()),
// Try to fall back to legacy encoding.
Err(_) => {
let proto = ProtoSourceDataLegacy::decode(buf).map_err(|err| err.to_string())?;
proto.into_rust().map_err(|err| err.to_string())
}
}
}

pub(crate) fn decode_dataflow_error_with_fallback(buf: &[u8]) -> Result<DataflowError, String> {
match ProtoDataflowError::decode(buf) {
Ok(proto) => proto.into_rust().map_err(|err| err.to_string()),
// Try to fall back to legacy encoding.
Err(_) => {
let proto = ProtoDataflowErrorLegacy::decode(buf).map_err(|err| err.to_string())?;
proto.into_rust().map_err(|err| err.to_string())
}
}
}

impl RustType<ProtoSourceDataLegacy> for SourceData {
fn into_proto(&self) -> ProtoSourceDataLegacy {
use proto_source_data_legacy::Kind;
ProtoSourceDataLegacy {
kind: Some(match &**self {
Ok(row) => Kind::Ok(row.into_proto()),
Err(err) => Kind::Err(err.into_proto()),
}),
}
}

fn from_proto(proto: ProtoSourceDataLegacy) -> Result<Self, TryFromProtoError> {
use proto_source_data_legacy::Kind;
match proto.kind {
Some(kind) => match kind {
Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
},
None => Result::Err(TryFromProtoError::missing_field(
"ProtoSourceDataLegacy::kind",
)),
}
}
}

impl RustType<ProtoDataflowErrorLegacy> for DataflowError {
fn into_proto(&self) -> ProtoDataflowErrorLegacy {
use proto_dataflow_error_legacy::Kind::*;
ProtoDataflowErrorLegacy {
kind: Some(match self {
DataflowError::DecodeError(err) => DecodeError(*err.into_proto()),
DataflowError::EvalError(err) => EvalError(*err.into_proto()),
DataflowError::SourceError(err) => SourceError(*err.into_proto()),
DataflowError::EnvelopeError(err) => EnvelopeErrorV1(Box::new(*err.into_proto())),
}),
}
}

fn from_proto(proto: ProtoDataflowErrorLegacy) -> Result<Self, TryFromProtoError> {
use proto_dataflow_error_legacy::Kind::*;
match proto.kind {
Some(kind) => match kind {
DecodeError(err) => Ok(DataflowError::DecodeError(Box::new(err.into_rust()?))),
EvalError(err) => Ok(DataflowError::EvalError(Box::new(err.into_rust()?))),
SourceError(err) => Ok(DataflowError::SourceError(Box::new(err.into_rust()?))),
EnvelopeErrorV1(err) => {
Ok(DataflowError::EnvelopeError(Box::new((*err).into_rust()?)))
}
},
None => Err(TryFromProtoError::missing_field(
"ProtoDataflowErrorLegacy::kind",
)),
}
}
}

impl RustType<ProtoEnvelopeErrorV1Legacy> for EnvelopeError {
fn into_proto(&self) -> ProtoEnvelopeErrorV1Legacy {
use proto_envelope_error_v1_legacy::Kind;
ProtoEnvelopeErrorV1Legacy {
kind: Some(match self {
EnvelopeError::Debezium(text) => Kind::Debezium(text.clone()),
EnvelopeError::Upsert(rust) => Kind::Upsert(Box::new(rust.into_proto())),
EnvelopeError::Flat(text) => Kind::Flat(text.clone()),
}),
}
}

fn from_proto(proto: ProtoEnvelopeErrorV1Legacy) -> Result<Self, TryFromProtoError> {
use proto_envelope_error_v1_legacy::Kind;
match proto.kind {
Some(Kind::Debezium(text)) => Ok(Self::Debezium(text)),
Some(Kind::Upsert(proto)) => {
let rust = RustType::from_proto(*proto)?;
Ok(Self::Upsert(rust))
}
Some(Kind::Flat(text)) => Ok(Self::Flat(text)),
None => Err(TryFromProtoError::missing_field(
"ProtoEnvelopeErrorV1Legacy::kind",
)),
}
}
}

impl RustType<ProtoUpsertErrorLegacy> for UpsertError {
fn into_proto(&self) -> ProtoUpsertErrorLegacy {
use proto_upsert_error_legacy::Kind;
ProtoUpsertErrorLegacy {
kind: Some(match self {
UpsertError::KeyDecode(err) => Kind::KeyDecode(err.into_proto()),
UpsertError::Value(err) => Kind::Value(Box::new(err.into_proto())),
UpsertError::NullKey(err) => Kind::NullKey(err.into_proto()),
}),
}
}

fn from_proto(proto: ProtoUpsertErrorLegacy) -> Result<Self, TryFromProtoError> {
use proto_upsert_error_legacy::Kind;
match proto.kind {
Some(Kind::KeyDecode(proto)) => {
let rust = RustType::from_proto(proto)?;
Ok(Self::KeyDecode(rust))
}
Some(Kind::Value(proto)) => {
let rust = RustType::from_proto(*proto)?;
Ok(Self::Value(rust))
}
Some(Kind::NullKey(proto)) => {
let rust = RustType::from_proto(proto)?;
Ok(Self::NullKey(rust))
}
None => Err(TryFromProtoError::missing_field(
"ProtoUpsertErrorLegacy::kind",
)),
}
}
}

impl RustType<ProtoUpsertValueErrorLegacy> for UpsertValueError {
fn into_proto(&self) -> ProtoUpsertValueErrorLegacy {
let inner = ProtoDataflowErrorLegacy {
kind: Some(proto_dataflow_error_legacy::Kind::DecodeError(
self.inner.into_proto(),
)),
};

ProtoUpsertValueErrorLegacy {
inner: Some(Box::new(inner)),
for_key: Some(self.for_key.into_proto()),
}
}

fn from_proto(proto: ProtoUpsertValueErrorLegacy) -> Result<Self, TryFromProtoError> {
let inner = match proto.inner {
Some(inner) => match inner.kind {
Some(proto_dataflow_error_legacy::Kind::DecodeError(error)) => {
RustType::from_proto(error)?
}
_ => panic!("unexpected kind in ProtoUpsertValueErrorLegacy: {inner:?}"),
},
None => {
return Err(TryFromProtoError::missing_field(
"ProtoUpsertValueErrorLegacy::inner",
))
}
};
let for_key = match proto.for_key {
Some(key) => RustType::from_proto(key)?,
None => {
return Err(TryFromProtoError::missing_field(
"ProtoUpsertValueErrorLegacy::for_key",
))
}
};
Ok(Self { inner, for_key })
}
}

0 comments on commit b57dcde

Please sign in to comment.