Skip to content

Commit

Permalink
catalog: Add unsigned ints to builtin tables (MaterializeInc#14419)
Browse files Browse the repository at this point in the history
This commit is a follow up of b9adc8c
and changes more signed integers to unsigned integers.
  • Loading branch information
jkosh44 committed Aug 25, 2022
1 parent e334775 commit 01ff11d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 43 deletions.
4 changes: 2 additions & 2 deletions misc/dbt-materialize/tests/adapter/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
actual_indexes = """
SELECT
o.name,
ic.index_position,
ic.on_position,
ic.index_position::int8,
ic.on_position::int8,
ic.on_expression
FROM mz_indexes i
JOIN mz_index_columns ic ON i.id = ic.index_id
Expand Down
22 changes: 11 additions & 11 deletions src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ pub static MZ_COLUMNS: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
desc: RelationDesc::empty()
.with_column("id", ScalarType::String.nullable(false))
.with_column("name", ScalarType::String.nullable(false))
.with_column("position", ScalarType::Int64.nullable(false))
.with_column("position", ScalarType::UInt64.nullable(false))
.with_column("nullable", ScalarType::Bool.nullable(false))
.with_column("type", ScalarType::String.nullable(false))
.with_column("default", ScalarType::String.nullable(true))
Expand All @@ -1091,8 +1091,8 @@ pub static MZ_INDEX_COLUMNS: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
schema: MZ_CATALOG_SCHEMA,
desc: RelationDesc::empty()
.with_column("index_id", ScalarType::String.nullable(false))
.with_column("index_position", ScalarType::Int64.nullable(false))
.with_column("on_position", ScalarType::Int64.nullable(true))
.with_column("index_position", ScalarType::UInt64.nullable(false))
.with_column("on_position", ScalarType::UInt64.nullable(true))
.with_column("on_expression", ScalarType::String.nullable(true))
.with_column("nullable", ScalarType::Bool.nullable(false)),
});
Expand Down Expand Up @@ -1278,7 +1278,7 @@ pub static MZ_AUDIT_EVENTS: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_audit_events",
schema: MZ_CATALOG_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("id", ScalarType::UInt64.nullable(false))
.with_column("event_type", ScalarType::String.nullable(false))
.with_column("object_type", ScalarType::String.nullable(false))
.with_column("event_details", ScalarType::Jsonb.nullable(false))
Expand Down Expand Up @@ -1306,9 +1306,9 @@ pub static MZ_STORAGE_USAGE: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_storage_usage",
schema: MZ_CATALOG_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("id", ScalarType::UInt64.nullable(false))
.with_column("object_id", ScalarType::String.nullable(true))
.with_column("size_bytes", ScalarType::Int64.nullable(false))
.with_column("size_bytes", ScalarType::UInt64.nullable(false))
.with_column(
"collection_timestamp",
ScalarType::TimestampTz.nullable(false),
Expand Down Expand Up @@ -1609,13 +1609,13 @@ pub const PG_INDEX: BuiltinView = BuiltinView {
-- MZ doesn't support replication so indisreplident is filled with false
false::pg_catalog.bool AS indisreplident,
-- Return zero if the index attribute is not a simple column reference, column position otherwise
pg_catalog.string_agg(coalesce(mz_index_columns.on_position, 0)::pg_catalog.text, ' ' ORDER BY mz_index_columns.index_position)::pg_catalog.int2vector AS indkey,
pg_catalog.string_agg(coalesce(mz_index_columns.on_position::int8, 0)::pg_catalog.text, ' ' ORDER BY mz_index_columns.index_position::int8)::pg_catalog.int2vector AS indkey,
-- MZ doesn't have per-column flags, so returning a 0 for each column in the index
pg_catalog.string_agg('0', ' ')::pg_catalog.int2vector AS indoption,
-- Index expressions are returned in MZ format
CASE pg_catalog.string_agg(mz_index_columns.on_expression, ' ' ORDER BY mz_index_columns.index_position)
CASE pg_catalog.string_agg(mz_index_columns.on_expression, ' ' ORDER BY mz_index_columns.index_position::int8)
WHEN NULL THEN NULL
ELSE '{' || pg_catalog.string_agg(mz_index_columns.on_expression, '}, {' ORDER BY mz_index_columns.index_position) || '}'
ELSE '{' || pg_catalog.string_agg(mz_index_columns.on_expression, '}, {' ORDER BY mz_index_columns.index_position::int8) || '}'
END AS indexprs,
-- MZ doesn't support indexes with predicates
NULL::pg_catalog.text AS indpred
Expand Down Expand Up @@ -1785,7 +1785,7 @@ pub const PG_ATTRDEF: BuiltinView = BuiltinView {
sql: "CREATE VIEW pg_catalog.pg_attrdef AS SELECT
NULL::pg_catalog.oid AS oid,
mz_objects.oid AS adrelid,
mz_columns.position AS adnum,
mz_columns.position::int8 AS adnum,
mz_columns.default AS adbin,
mz_columns.default AS adsrc
FROM mz_catalog.mz_columns
Expand Down Expand Up @@ -2045,7 +2045,7 @@ SELECT
s.name AS table_schema,
o.name AS table_name,
c.name AS column_name,
c.position AS ordinal_position,
c.position::int8 AS ordinal_position,
c.type AS data_type,
NULL::pg_catalog.int4 AS character_maximum_length,
NULL::pg_catalog.int4 AS numeric_precision,
Expand Down
38 changes: 11 additions & 27 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use mz_compute_client::command::{ProcessId, ReplicaId};
use mz_compute_client::controller::ComputeInstanceId;
use mz_controller::{ComputeInstanceStatus, ConcreteComputeInstanceReplicaLocation};
use mz_expr::MirScalarExpr;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_repr::adt::array::ArrayDimension;
use mz_repr::adt::jsonb::Jsonb;
Expand Down Expand Up @@ -228,7 +229,7 @@ impl CatalogState {
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::String(column_name.as_str()),
Datum::Int64(i as i64 + 1),
Datum::UInt64(u64::cast_from(i + 1)),
Datum::from(column_type.nullable),
Datum::String(pgtype.name()),
default,
Expand Down Expand Up @@ -498,23 +499,22 @@ impl CatalogState {
.column_types,
)
.nullable;
let seq_in_index = i64::try_from(i + 1).expect("invalid index sequence number");
let seq_in_index = u64::cast_from(i + 1);
let key_sql = key_sqls
.get(i)
.expect("missing sql information for index key")
.to_string();
let (field_number, expression) = match key {
MirScalarExpr::Column(col) => (
Datum::Int64(i64::try_from(*col + 1).expect("invalid index column number")),
Datum::Null,
),
MirScalarExpr::Column(col) => {
(Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
}
_ => (Datum::Null, Datum::String(&key_sql)),
};
updates.push(BuiltinTableUpdate {
id: self.resolve_builtin_table(&MZ_INDEX_COLUMNS),
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::Int64(seq_in_index),
Datum::UInt64(seq_in_index),
field_number,
expression,
Datum::from(nullable),
Expand Down Expand Up @@ -688,16 +688,11 @@ impl CatalogState {
.into_row();
let event_details = event_details.iter().next().unwrap();
let dt = mz_ore::now::to_datetime(occurred_at).naive_utc();
let id = i64::try_from(event.sortable_id()).map_err(|e| {
Error::new(ErrorKind::Unstructured(format!(
"exceeded event id space: {}",
e
)))
})?;
let id = event.sortable_id();
Ok(BuiltinTableUpdate {
id: self.resolve_builtin_table(&MZ_AUDIT_EVENTS),
row: Row::pack_slice(&[
Datum::Int64(id),
Datum::UInt64(id),
Datum::String(&format!("{}", event_type)),
Datum::String(&format!("{}", object_type)),
event_details,
Expand Down Expand Up @@ -735,12 +730,6 @@ impl CatalogState {
}
};

let valid_id = i64::try_from(id).map_err(|e| {
Error::new(ErrorKind::Unstructured(format!(
"exceeded event id space: {}",
e
)))
})?;
let table = self.resolve_builtin_table(&MZ_STORAGE_USAGE);
let object_id_val = match object_id {
Some(s) => Datum::String(s),
Expand All @@ -749,14 +738,9 @@ impl CatalogState {
let dt = mz_ore::now::to_datetime(collection_timestamp).naive_utc();

let row = Row::pack_slice(&[
Datum::Int64(valid_id),
Datum::UInt64(id),
object_id_val,
Datum::Int64(
size_bytes
.try_into()
// Unless one object has over 9k petabytes of storage...
.expect("Storage bytes size should not overflow i64"),
),
Datum::UInt64(size_bytes),
Datum::TimestampTz(DateTime::from_utc(dt, Utc)),
]);
let diff = 1;
Expand Down
26 changes: 24 additions & 2 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,37 @@

//! Integration tests for Materialize server.

use bytes::Buf;
use std::error::Error;
use std::thread;
use std::time::Duration;

use mz_ore::retry::Retry;
use reqwest::{blocking::Client, StatusCode, Url};
use serde_json::json;
use tokio_postgres::types::{FromSql, Type};

use crate::util::KAFKA_ADDRS;

pub mod util;

#[derive(Debug)]
struct UInt8(u64);

impl<'a> FromSql<'a> for UInt8 {
fn from_sql(_: &Type, mut raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
let v = raw.get_u64();
if !raw.is_empty() {
return Err("invalid buffer size".into());
}
Ok(Self(v))
}

fn accepts(ty: &Type) -> bool {
ty.oid() == mz_pgrepr::oid::TYPE_UINT8_OID
}
}

#[test]
fn test_persistence() -> Result<(), Box<dyn Error>> {
mz_ore::test::init_logging();
Expand Down Expand Up @@ -64,8 +83,11 @@ fn test_persistence() -> Result<(), Box<dyn Error>> {
client
.query("SHOW INDEXES FROM mat", &[])?
.into_iter()
.map(|row| (row.get("Column_name"), row.get("Seq_in_index")))
.collect::<Vec<(String, i64)>>(),
.map(|row| (
row.get("Column_name"),
row.get::<_, UInt8>("Seq_in_index").0
))
.collect::<Vec<(String, u64)>>(),
&[
("a".into(), 1),
("a_data".into(), 2),
Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/pg-catalog.td
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ attrelid false oid
attname false text
atttypid false oid
attlen true smallint
attnum false bigint
attnum false uint8
atttypmod false integer
attnotnull false boolean
atthasdef false boolean
Expand Down

0 comments on commit 01ff11d

Please sign in to comment.