Skip to content

Commit

Permalink
Merge pull request apache#787 from Zariel/iter-scanner
Browse files Browse the repository at this point in the history
add iter scanner interface and method
  • Loading branch information
Zariel authored Sep 17, 2016
2 parents 1b26a6c + 4b9990c commit 6b33c11
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 36 deletions.
21 changes: 8 additions & 13 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,30 +339,25 @@ func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err e

hosts = []*HostInfo{localHost}

iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
if iter == nil {
rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner()
if rows == nil {
return r.prevHosts, r.prevPartitioner, nil
}

var (
host = &HostInfo{port: r.session.cfg.Port}
versionBytes []byte
)
for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &versionBytes) {
if err = host.version.unmarshal(versionBytes); err != nil {
log.Printf("invalid peer entry: peer=%s host_id=%s tokens=%v version=%s\n", host.peer, host.hostId, host.tokens, versionBytes)
for rows.Next() {
host := &HostInfo{port: r.session.cfg.Port}
err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
if err != nil {
log.Println(err)
continue
}

if r.matchFilter(host) {
hosts = append(hosts, host)
}
host = &HostInfo{
port: r.session.cfg.Port,
}
}

if err = iter.Close(); err != nil {
if err = rows.Err(); err != nil {
return nil, "", err
}

Expand Down
150 changes: 127 additions & 23 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,128 @@ func (iter *Iter) Columns() []ColumnInfo {
return iter.meta.columns
}

type Scanner interface {
Next() bool
Scan(...interface{}) error
Err() error
}

type iterScanner struct {
iter *Iter
cols [][]byte
}

// Next advances the row pointer to point at the next row, the row is valid until
// the next call of Next. It returns true if there is a row which is available to be
// scanned into with Scan.
// Next must be called before every call to Scan.
func (is *iterScanner) Next() bool {
iter := is.iter
if iter.err != nil {
return false
}

if iter.pos >= iter.numRows {
if iter.next != nil {
is.iter = iter.next.fetch()
return is.Next()
}
return false
}

cols := make([][]byte, len(iter.meta.columns))
for i := 0; i < len(cols); i++ {
col, err := iter.readColumn()
if err != nil {
iter.err = err
return false
}
cols[i] = col
}
is.cols = cols
iter.pos++

return true
}

func scanColumn(p []byte, col ColumnInfo, dest []interface{}) (int, error) {
if dest[0] == nil {
return 1, nil
}

if col.TypeInfo.Type() == TypeTuple {
// this will panic, actually a bug, please report
tuple := col.TypeInfo.(TupleTypeInfo)

count := len(tuple.Elems)
// here we pass in a slice of the struct which has the number number of
// values as elements in the tuple
if err := Unmarshal(col.TypeInfo, p, dest[:count]); err != nil {
return 0, err
}
return count, nil
} else {
if err := Unmarshal(col.TypeInfo, p, dest[0]); err != nil {
return 0, err
}
return 1, nil
}
}

// Scan copies the current row's columns into dest. If the length of dest does not equal
// the number of columns returned in the row an error is returned. If an error is encountered
// when unmarshalling a column into the value in dest an error is returned and the row is invalidated
// until the next call to Next.
// Next must be called before calling Scan, if it is not an error is returned.
func (is *iterScanner) Scan(dest ...interface{}) error {
if is.cols == nil {
return errors.New("gocql: Scan called without calling Next")
}

iter := is.iter
// currently only support scanning into an expand tuple, such that its the same
// as scanning in more values from a single column
if len(dest) != iter.meta.actualColCount {
return fmt.Errorf("gocql: not enough columns to scan into: have %d want %d", len(dest), iter.meta.actualColCount)
}

// i is the current position in dest, could posible replace it and just use
// slices of dest
i := 0
var err error
for _, col := range iter.meta.columns {
var n int
n, err = scanColumn(is.cols[i], col, dest[i:])
if err != nil {
break
}
i += n
}

is.cols = nil

return err
}

// Err returns the if there was one during iteration that resulted in iteration being unable to complete.
// Err will also release resources held by the iterator and should not used after being called.
func (is *iterScanner) Err() error {
iter := is.iter
is.iter = nil
is.cols = nil
return iter.Close()
}

// Scanner returns a row Scanner which provides an interface to scan rows in a manner which is
// similar to database/sql. The iter should NOT be used again after calling this method.
func (iter *Iter) Scanner() Scanner {
if iter == nil {
return nil
}

return &iterScanner{iter: iter}
}

func (iter *Iter) readColumn() ([]byte, error) {
return iter.framer.readBytesInternal()
}
Expand Down Expand Up @@ -1080,37 +1202,19 @@ func (iter *Iter) Scan(dest ...interface{}) bool {
// i is the current position in dest, could posible replace it and just use
// slices of dest
i := 0
for c := range iter.meta.columns {
col := &iter.meta.columns[c]
for _, col := range iter.meta.columns {
colBytes, err := iter.readColumn()
if err != nil {
iter.err = err
return false
}

if dest[i] == nil {
i++
continue
}

switch col.TypeInfo.Type() {
case TypeTuple:
// this will panic, actually a bug, please report
tuple := col.TypeInfo.(TupleTypeInfo)

count := len(tuple.Elems)
// here we pass in a slice of the struct which has the number number of
// values as elements in the tuple
iter.err = Unmarshal(col.TypeInfo, colBytes, dest[i:i+count])
i += count
default:
iter.err = Unmarshal(col.TypeInfo, colBytes, dest[i])
i++
}

if iter.err != nil {
n, err := scanColumn(colBytes, col, dest[i:])
if err != nil {
iter.err = err
return false
}
i += n
}

iter.pos++
Expand Down

0 comments on commit 6b33c11

Please sign in to comment.