Skip to content

Commit

Permalink
metadata: split out column scanning
Browse files Browse the repository at this point in the history
Column scanning is different between protocol versions, split it out
into its own funcs to isolate the chnages.
  • Loading branch information
Zariel committed Oct 22, 2016
1 parent dff0caf commit d9d10be
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 98 deletions.
6 changes: 4 additions & 2 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,8 @@ func TestGetColumnMetadata(t *testing.T) {
t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, ColumnRegular, thirdID.Kind)
}

if thirdID.Index.Name != "index_column_metadata" {
if !session.useSystemSchema && thirdID.Index.Name != "index_column_metadata" {
// TODO(zariel): update metadata to scan index from system_schema
t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
}
}
Expand Down Expand Up @@ -1876,7 +1877,8 @@ func TestKeyspaceMetadata(t *testing.T) {
if !found {
t.Fatalf("Expected a column definition for 'third_id'")
}
if thirdColumn.Index.Name != "index_metadata" {
if !session.useSystemSchema && thirdColumn.Index.Name != "index_metadata" {
// TODO(zariel): scan index info from system_schema
t.Errorf("Expected column index named 'index_metadata' but was '%s'", thirdColumn.Index.Name)
}
}
Expand Down
242 changes: 146 additions & 96 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,19 +587,11 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
return tables, nil
}

// query for only the column metadata in the specified keyspace from system.schema_columns
func getColumnMetadata(
session *Session,
keyspaceName string,
) ([]ColumnMetadata, error) {
// Deal with differences in protocol versions
var stmt string
var scan func(*Iter, *ColumnMetadata, *[]byte) bool
if session.cfg.ProtoVersion == 1 {
// V1 does not support the type column, and all returned rows are
// of kind "regular".
stmt = `
SELECT
func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
// V1 does not support the type column, and all returned rows are
// of kind "regular".
const stmt = `
SELECT
columnfamily_name,
column_name,
component_index,
Expand All @@ -608,54 +600,57 @@ func getColumnMetadata(
index_type,
index_options
FROM system.schema_columns
WHERE keyspace_name = ?
`
scan = func(
iter *Iter,
column *ColumnMetadata,
indexOptionsJSON *[]byte,
) bool {
// all columns returned by V1 are regular
column.Kind = ColumnRegular
return iter.Scan(
&column.Table,
&column.Name,
&column.ComponentIndex,
&column.Validator,
&column.Index.Name,
&column.Index.Type,
&indexOptionsJSON,
)
WHERE keyspace_name = ?`

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
for rows.Next() {
var (
column = ColumnMetadata{Keyspace: keyspace}
indexOptionsJSON []byte
)

// all columns returned by V1 are regular
column.Kind = ColumnRegular

err := rows.Scan(&column.Table,
&column.Name,
&column.ComponentIndex,
&column.Validator,
&column.Index.Name,
&column.Index.Type,
&indexOptionsJSON)

if err != nil {
return nil, err
}
} else if session.useSystemSchema { // Cassandra 3.x+
stmt = `
SELECT
table_name,
column_name,
clustering_order,
type,
kind,
position
FROM system_schema.columns
WHERE keyspace_name = ?
`
scan = func(
iter *Iter,
column *ColumnMetadata,
indexOptionsJSON *[]byte,
) bool {
return iter.Scan(
&column.Table,
&column.Name,
&column.ClusteringOrder,
&column.Validator,
&column.Kind,
&column.ComponentIndex,
)

if len(indexOptionsJSON) > 0 {
err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
if err != nil {
return nil, fmt.Errorf(
"Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
indexOptionsJSON,
column.Name,
column.Table,
err)
}
}
} else {
// V2+ supports the type column
stmt = `

columns = append(columns, column)
}

if err := rows.Err(); err != nil {
return nil, err
}

return columns, nil
}

func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
// V2+ supports the type column
const stmt = `
SELECT
columnfamily_name,
column_name,
Expand All @@ -666,57 +661,112 @@ func getColumnMetadata(
index_options,
type
FROM system.schema_columns
WHERE keyspace_name = ?
`
scan = func(
iter *Iter,
column *ColumnMetadata,
indexOptionsJSON *[]byte,
) bool {
return iter.Scan(
&column.Table,
&column.Name,
&column.ComponentIndex,
&column.Validator,
&column.Index.Name,
&column.Index.Type,
&indexOptionsJSON,
&column.Kind,
)
}
}

// get the columns metadata
columns := []ColumnMetadata{}
column := ColumnMetadata{Keyspace: keyspaceName}
WHERE keyspace_name = ?`

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
for rows.Next() {
var (
column = ColumnMetadata{Keyspace: keyspace}
indexOptionsJSON []byte
)

err := rows.Scan(&column.Table,
&column.Name,
&column.ComponentIndex,
&column.Validator,
&column.Index.Name,
&column.Index.Type,
&indexOptionsJSON,
&column.Kind,
)

var indexOptionsJSON []byte

iter := session.control.query(stmt, keyspaceName)

for scan(iter, &column, &indexOptionsJSON) {
var err error
if err != nil {
return nil, err
}

// decode the index options
if indexOptionsJSON != nil {
err = json.Unmarshal(indexOptionsJSON, &column.Index.Options)
if len(indexOptionsJSON) > 0 {
err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
if err != nil {
iter.Close()
return nil, fmt.Errorf(
"Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
indexOptionsJSON,
column.Name,
column.Table,
err,
)
err)
}
}

columns = append(columns, column)
column = ColumnMetadata{Keyspace: keyspaceName}
}

err := iter.Close()
if err := rows.Err(); err != nil {
return nil, err
}

return columns, nil

}

func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
const stmt = `
SELECT
table_name,
column_name,
clustering_order,
type,
kind,
position
FROM system_schema.columns
WHERE keyspace_name = ?`

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
for rows.Next() {
column := ColumnMetadata{Keyspace: keyspace}

err := rows.Scan(&column.Table,
&column.Name,
&column.ClusteringOrder,
&column.Validator,
&column.Kind,
&column.ComponentIndex,
)

if err != nil {
return nil, err
}

columns = append(columns, column)
}

if err := rows.Err(); err != nil {
return nil, err
}

// TODO(zariel): get column index info from system_schema.indexes

return columns, nil
}

// query for only the column metadata in the specified keyspace from system.schema_columns
func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
var (
columns []ColumnMetadata
err error
)

// Deal with differences in protocol versions
if session.cfg.ProtoVersion == 1 {
columns, err = session.scanColumnMetadataV1(keyspaceName)
} else if session.useSystemSchema { // Cassandra 3.x+
columns, err = session.scanColumnMetadataSystem(keyspaceName)
} else {
columns, err = session.scanColumnMetadataV2(keyspaceName)
}

if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("Error querying column schema: %v", err)
}
Expand Down

0 comments on commit d9d10be

Please sign in to comment.