Skip to content

Commit

Permalink
more decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyihu6 committed Jan 27, 2025
1 parent a4d12d2 commit 3aaac32
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func (d *eventDecoder) initForKey(
ctx context.Context, key roachpb.Key, schemaTS hlc.Timestamp, keyOnly bool,
) error {
desc, familyID, err := d.rfCache.tableDescForKey(ctx, key, schemaTS)
fmt.Printf("desc: %v, familyID: %v, err: %v, schemaTS: %v\n", desc, familyID, err, schemaTS)
if err != nil {
return err
}
Expand Down
75 changes: 64 additions & 11 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,47 @@ func (no NemesesOption) String() string {
no.EnableFpValidator, no.EnableSQLSmith)
}

func dumpTable(db *gosql.DB, tableName string) {
fmt.Print("---------------------------------\n")
fmt.Printf("start dumping table: %s\n", tableName)
rows, err := db.Query(fmt.Sprintf("SELECT * FROM %s", tableName))
if err != nil {
fmt.Printf("error: %v\n", err)
}
defer rows.Close()

columns, err := rows.Columns()
if err != nil {
fmt.Printf("error: %v\n", err)
}

values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}

fmt.Printf("Table %s:", tableName)

for rows.Next() {
if err := rows.Scan(valuePtrs...); err != nil {
fmt.Printf("error: %v\n", err)
}

for i, col := range columns {
fmt.Printf(" %s: %v", col, values[i])
}
fmt.Printf("\n")
}

fmt.Printf("end dumping table: %s\n", tableName)
fmt.Print("---------------------------------\n")
if err := rows.Err(); err != nil {
fmt.Printf("error: %v\n", err)

}
}

// RunNemesis runs a jepsen-style validation of whether a changefeed meets our
// user-facing guarantees. It's driven by a state machine with various nemeses:
// txn begin/commit/rollback, job pause/unpause.
Expand All @@ -96,6 +137,7 @@ func (no NemesesOption) String() string {
// real output of a changefeed. The output rows and resolved timestamps of the
// tested feed are fed into them to check for anomalies.
func RunNemesis(
l func(format string, args ...any),
f TestFeedFactory,
db *gosql.DB,
testName string,
Expand Down Expand Up @@ -194,10 +236,10 @@ func RunNemesis(
},
}

if nOp.EnableFpValidator {
// TODO(#139351): Fingerprint validator doesn't support user defined types.
ns.eventMix[eventCreateEnum{}] = 0
}
//if nOp.EnableFpValidator {
// // TODO(#139351): Fingerprint validator doesn't support user defined types.
//}
ns.eventMix[eventCreateEnum{}] = 0

// Create the table and set up some initial splits.
if _, err := db.Exec(`CREATE TABLE foo (id INT PRIMARY KEY, ts STRING DEFAULT '0')`); err != nil {
Expand Down Expand Up @@ -247,12 +289,15 @@ func RunNemesis(
for i := 0; i < numInserts; i++ {
query := queryGen.Generate()
if _, err := db.Exec(query); err != nil {
log.Infof(ctx, "Skipping query %s because error %s", query, err)
continue
} else {
log.Infof(ctx, "Executing query %s", query)
}
}
}

dumpTable(db, "foo")

cfo := newChangefeedOption(testName)
changefeedStatement := fmt.Sprintf(
`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`,
Expand Down Expand Up @@ -785,13 +830,18 @@ func addColumn(a fsm.Args) error {
case addColumnTypeEnum:
// Pick a random enum to add.
enum := payload.enum
if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d enum%d DEFAULT 'hello'`,
ns.currentTestColumnCount, enum)); err != nil {
execS := fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d enum%d DEFAULT 'hello'`,
ns.currentTestColumnCount, enum)
dumpTable(ns.db, "foo")
if _, err := ns.db.Exec(execS); err != nil {
return err
}
case addColumnTypeString:
if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d STRING DEFAULT 'x'`,
ns.currentTestColumnCount)); err != nil {
execS := fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d STRING DEFAULT 'x'`,
ns.currentTestColumnCount)
dumpTable(ns.db, "foo")
fmt.Println(execS)
if _, err := ns.db.Exec(execS); err != nil {
return err
}
}
Expand Down Expand Up @@ -820,8 +870,11 @@ func removeColumn(a fsm.Args) error {
return errors.AssertionFailedf(`removeColumn should be called with` +
`at least one test column.`)
}
if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo DROP COLUMN test%d`,
ns.currentTestColumnCount-1)); err != nil {
execS := fmt.Sprintf(`ALTER TABLE foo DROP COLUMN test%d`,
ns.currentTestColumnCount-1)
fmt.Println(execS)
dumpTable(ns.db, "foo")
if _, err := ns.db.Exec(execS); err != nil {
return err
}
ns.currentTestColumnCount--
Expand Down
15 changes: 8 additions & 7 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ var testSinkFlushFrequency = 100 * time.Millisecond
// schema changer with a probability of 10% using the provided SQL DB
// connection. This returns true if the declarative schema changer is disabled.
func maybeDisableDeclarativeSchemaChangesForTest(t testing.TB, sqlDB *sqlutils.SQLRunner) bool {
disable := rand.Float32() < 0.1
if disable {
t.Log("using legacy schema changer")
sqlDB.Exec(t, "SET use_declarative_schema_changer='off'")
sqlDB.Exec(t, "SET CLUSTER SETTING sql.defaults.use_declarative_schema_changer='off'")
}
return disable
//disable := rand.Float32() < 0.1
//if disable {
//
//}
t.Log("using legacy schema changer")
sqlDB.Exec(t, "SET use_declarative_schema_changer='off'")
sqlDB.Exec(t, "SET CLUSTER SETTING sql.defaults.use_declarative_schema_changer='off'")
return true
}

func waitForSchemaChange(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestChangefeedNemeses(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
withLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB)

v, err := cdctest.RunNemesis(f, s.DB, t.Name(), withLegacySchemaChanger, rng, nop)
v, err := cdctest.RunNemesis(t.Logf, f, s.DB, t.Name(), withLegacySchemaChanger, rng, nop)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ func (rf *Fetcher) prettyKeyDatums(
// it encountered a null while decoding.
func (rf *Fetcher) DecodeIndexKey(key roachpb.Key) (remaining []byte, foundNull bool, err error) {
key = key[rf.table.spec.KeyPrefixLength:]
fmt.Println("at rf fetcher: ", key)
return rowenc.DecodeKeyValsUsingSpec(rf.table.spec.KeyAndSuffixColumns, key, rf.table.keyVals)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/cockroachdb/apd"
"math"
"strconv"
"strings"
Expand All @@ -19,7 +20,6 @@ import (
"unicode/utf8"
"unsafe"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/util/bitarray"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -2091,7 +2091,7 @@ func PeekLength(b []byte) (int, error) {
if m >= decimalNaN && m <= decimalNaNDesc {
return getDecimalLen(b)
}
return 0, errors.Errorf("unknown tag %d", m)
return 0, errors.Errorf("unknown tag %d, bytes: %x", m, b)
}

// PrettyPrintValue returns the string representation of all contiguous
Expand Down

0 comments on commit 3aaac32

Please sign in to comment.