Skip to content

Commit

Permalink
crosscluster/logical: check UDT equivalency during LDR creation
Browse files Browse the repository at this point in the history
This check requires that the logical and physical representations of
each type are identical. In the future, we may investigate ways to only
require logical equivalency.

Release note (ops change): When creating a logical replication stream,
any user-defined types in the source and destination are now checked for
equivalency. This allows for creating a stream that handles user-defined
types without needing to use the `WITH SKIP SCHEMA CHECK` option as long
as the replication stream uses `mode = immediate`.
  • Loading branch information
rafiss committed Oct 23, 2024
1 parent 5eb9caf commit ea57988
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
21 changes: 20 additions & 1 deletion pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -208,14 +210,31 @@ func createLogicalReplicationStreamPlanHook(
return err
}

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
// Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved,
// we cannot allow user-defined types on the SQL ingestion path.
if m, ok := options.GetMode(); ok && m != "immediate" {
return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types")
}
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(sourceTypes)

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
// is used for filtering; if not, they probably forgot that step.
throwNoTTLWithCDCIgnoreError := discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes

for i, name := range srcTableNames {
td := spec.TableDescriptors[name]
srcTableDescs[i] = &td
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
repPairs[i].SrcDescriptorID = int32(td.ID)
if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication {
throwNoTTLWithCDCIgnoreError = false
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
}

// TODO(msbutler): is this import type resolver kosher? Should put in a new package.
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(plan.SourceTypes)
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
Expand Down
36 changes: 25 additions & 11 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,20 +1925,22 @@ func TestUserDefinedTypes(t *testing.T) {
// Create the same user-defined type both tables.
dbA.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')")
dbB.Exec(t, "CREATE TYPE my_enum AS ENUM ('one', 'two', 'three')")
dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")

dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val my_enum DEFAULT 'two')")
dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one')")
dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data VALUES (2)")
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data with skip schema check", dbBURL.String()).Scan(&jobAID)
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one"}, {"2", "two"}})
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
}

// TestLogicalReplicationCreationChecks verifies that we check that the table
Expand Down Expand Up @@ -2100,14 +2102,14 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Verify that the stream cannot be created with user defined types.
// Verify that the stream cannot be created with mismatched enum types.
dbA.Exec(t, "DROP TRIGGER my_trigger ON tab")
dbA.Exec(t, "CREATE TYPE mytype AS ENUM ('a', 'b', 'c')")
dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b', 'c')")
dbB.Exec(t, "CREATE TYPE b.mytype AS ENUM ('a', 'b')")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL")
dbB.Exec(t, "ALTER TABLE b.tab ADD COLUMN enum_col b.mytype NOT NULL")
dbA.ExpectErr(t,
`cannot create logical replication stream: destination table tab column enum_col has user-defined type USER DEFINED ENUM: public.mytype`,
`cannot create logical replication stream: .* destination type USER DEFINED ENUM: public.mytype has logical representations \[a b c\], but the source type USER DEFINED ENUM: mytype has \[a b\]`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)
// Allows user to create LDR stream with UDT via SKIP SCHEMA CHECK.
Expand All @@ -2118,9 +2120,21 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
dbA.Exec(t, "CANCEL JOB $1", jobIDSkipSchemaCheck)
jobutils.WaitForJobToCancel(t, dbA, jobIDSkipSchemaCheck)

// Check that UNIQUE indexes match.
// Verify that the stream cannot be created with mismatched composite types.
dbA.Exec(t, "ALTER TABLE tab DROP COLUMN enum_col")
dbB.Exec(t, "ALTER TABLE b.tab DROP COLUMN enum_col")
dbA.Exec(t, "CREATE TYPE composite_typ AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE b.composite_typ AS (a TEXT, b INT)")
dbA.Exec(t, "ALTER TABLE tab ADD COLUMN composite_udt_col composite_typ NOT NULL")
dbB.Exec(t, "ALTER TABLE b.tab ADD COLUMN composite_udt_col b.composite_typ NOT NULL")
dbA.ExpectErr(t,
`cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING`,
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String(),
)

// Check that UNIQUE indexes match.
dbA.Exec(t, "ALTER TABLE tab DROP COLUMN composite_udt_col")
dbB.Exec(t, "ALTER TABLE b.tab DROP COLUMN composite_udt_col")
dbA.Exec(t, "CREATE UNIQUE INDEX payload_idx ON tab(payload)")
dbB.Exec(t, "CREATE UNIQUE INDEX multi_idx ON b.tab(composite_col, pk)")
dbA.ExpectErr(t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (p *partitionedStreamClient) PlanLogicalReplication(
}

sourceTypes := make([]*descpb.TypeDescriptor, len(streamSpec.TypeDescriptors))
for _, desc := range streamSpec.TypeDescriptors {
sourceTypes = append(sourceTypes, &desc)
for i, desc := range streamSpec.TypeDescriptors {
sourceTypes[i] = &desc
}

return LogicalReplicationPlan{
Expand Down
70 changes: 64 additions & 6 deletions pkg/sql/catalog/tabledesc/logical_replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tabledesc

import (
"bytes"
"cmp"
"slices"
"strings"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -203,20 +205,76 @@ func checkSrcDstColsMatch(src *descpb.TableDescriptor, dst *descpb.TableDescript
)
}

if dstCol.Type.UserDefined() {
if err := checkTypesMatch(srcCol.Type, dstCol.Type); err != nil {
return errors.Wrapf(err,
"destination table %s column %s has type %s, but the source table %s has type %s",
dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(),
)
}
}
return nil
}

// checkTypesMatch checks that the source and destination types match. Enums
// need to be equal in both physical and logical representations.
func checkTypesMatch(srcTyp *types.T, dstTyp *types.T) error {
switch {
case dstTyp.TypeMeta.EnumData != nil:
if srcTyp.TypeMeta.EnumData == nil {
return errors.Newf(
"destination type %s is an ENUM, but the source type %s is not",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}
if !slices.Equal(srcTyp.TypeMeta.EnumData.LogicalRepresentations, dstTyp.TypeMeta.EnumData.LogicalRepresentations) {
return errors.Newf(
"destination type %s has logical representations %v, but the source type %s has %v",
dstTyp.SQLStringForError(), dstTyp.TypeMeta.EnumData.LogicalRepresentations,
srcTyp.SQLStringForError(), srcTyp.TypeMeta.EnumData.LogicalRepresentations,
)
}
if !slices.EqualFunc(
srcTyp.TypeMeta.EnumData.PhysicalRepresentations, dstTyp.TypeMeta.EnumData.PhysicalRepresentations,
func(x, y []byte) bool { return bytes.Equal(x, y) },
) {
return errors.Newf(
"destination table %s column %s has user-defined type %s",
dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(),
"destination type %s and source type %s have mismatched physical representations",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}

if !srcCol.Type.Identical(dstCol.Type) {
case len(dstTyp.TupleContents()) > 0:
if len(srcTyp.TupleContents()) == 0 {
return errors.Newf(
"destination table %s column %s has type %s, but the source table %s has type %s",
dst.Name, dstCol.Name, dstCol.Type.SQLStringForError(), src.Name, srcCol.Type.SQLStringForError(),
"destination type %s is a tuple, but the source type %s is not",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}
if len(dstTyp.TupleContents()) != len(srcTyp.TupleContents()) {
return errors.Newf(
"destination type %s has %d tuple elements, but the source type %s has %d tuple elements",
dstTyp.SQLStringForError(), len(dstTyp.TupleContents()),
srcTyp.SQLStringForError(), len(srcTyp.TupleContents()),
)
}
for i := range dstTyp.TupleContents() {
if err := checkTypesMatch(srcTyp.TupleContents()[i], dstTyp.TupleContents()[i]); err != nil {
return errors.Wrapf(err,
"destination type %s tuple element %d does not match source type %s tuple element %d",
dstTyp.SQLStringForError(), i, srcTyp.SQLStringForError(), i,
)
}
}

default:
if !srcTyp.Identical(dstTyp) {
return errors.Newf(
"destination type %s does not match source type %s",
dstTyp.SQLStringForError(), srcTyp.SQLStringForError(),
)
}
}

return nil
}

Expand Down

0 comments on commit ea57988

Please sign in to comment.