forked from googleapis/google-cloud-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiterator.go
413 lines (386 loc) · 13.4 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"context"
"errors"
"fmt"
"reflect"
bq "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
)
// Construct a RowIterator.
func newRowIterator(ctx context.Context, src *rowSource, pf pageFetcher) *RowIterator {
it := &RowIterator{
ctx: ctx,
src: src,
pf: pf,
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.rows) },
func() interface{} { r := it.rows; it.rows = nil; return r })
return it
}
// A RowIterator provides access to the result of a BigQuery lookup.
type RowIterator struct {
ctx context.Context
src *rowSource
arrowIterator ArrowIterator
arrowDecoder *arrowDecoder
pageInfo *iterator.PageInfo
nextFunc func() error
pf pageFetcher
// StartIndex can be set before the first call to Next. If PageInfo().Token
// is also set, StartIndex is ignored. If Storage API is enabled,
// StartIndex is also ignored because is not supported. IsAccelerated()
// method can be called to check if Storage API is enabled for the RowIterator.
StartIndex uint64
// The schema of the table.
// In some scenarios it will only be available after the first
// call to Next(), like when a call to Query.Read uses
// the jobs.query API for an optimized query path.
Schema Schema
// The total number of rows in the result.
// In some scenarios it will only be available after the first
// call to Next(), like when a call to Query.Read uses
// the jobs.query API for an optimized query path.
// May be zero just after rows were inserted.
TotalRows uint64
rows [][]Value
structLoader structLoader // used to populate a pointer to a struct
}
// SourceJob returns an instance of a Job if the RowIterator is backed by a query,
// or a nil.
func (ri *RowIterator) SourceJob() *Job {
if ri.src == nil {
return nil
}
if ri.src.j == nil {
return nil
}
return &Job{
c: ri.src.j.c,
projectID: ri.src.j.projectID,
location: ri.src.j.location,
jobID: ri.src.j.jobID,
}
}
// QueryID returns a query ID if available, or an empty string.
func (ri *RowIterator) QueryID() string {
if ri.src == nil {
return ""
}
return ri.src.queryID
}
// We declare a function signature for fetching results. The primary reason
// for this is to enable us to swap out the fetch function with alternate
// implementations (e.g. to enable testing).
type pageFetcher func(ctx context.Context, _ *rowSource, _ Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error)
// Next loads the next row into dst. Its return value is iterator.Done if there
// are no more results. Once Next returns iterator.Done, all subsequent calls
// will return iterator.Done.
//
// dst may implement ValueLoader, or may be a *[]Value, *map[string]Value, or struct pointer.
//
// If dst is a *[]Value, it will be set to new []Value whose i'th element
// will be populated with the i'th column of the row.
//
// If dst is a *map[string]Value, a new map will be created if dst is nil. Then
// for each schema column name, the map key of that name will be set to the column's
// value. STRUCT types (RECORD types or nested schemas) become nested maps.
//
// If dst is pointer to a struct, each column in the schema will be matched
// with an exported field of the struct that has the same name, ignoring case.
// Unmatched schema columns and struct fields will be ignored.
//
// Each BigQuery column type corresponds to one or more Go types; a matching struct
// field must be of the correct type. The correspondences are:
//
// STRING string
// BOOL bool
// INTEGER int, int8, int16, int32, int64, uint8, uint16, uint32
// FLOAT float32, float64
// BYTES []byte
// TIMESTAMP time.Time
// DATE civil.Date
// TIME civil.Time
// DATETIME civil.DateTime
// NUMERIC *big.Rat
// BIGNUMERIC *big.Rat
//
// The big.Rat type supports numbers of arbitrary size and precision.
// See https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type
// for more on NUMERIC.
//
// A repeated field corresponds to a slice or array of the element type. A STRUCT
// type (RECORD or nested schema) corresponds to a nested struct or struct pointer.
// All calls to Next on the same iterator must use the same struct type.
//
// It is an error to attempt to read a BigQuery NULL value into a struct field,
// unless the field is of type []byte or is one of the special Null types: NullInt64,
// NullFloat64, NullBool, NullString, NullTimestamp, NullDate, NullTime or
// NullDateTime. You can also use a *[]Value or *map[string]Value to read from a
// table with NULLs.
func (it *RowIterator) Next(dst interface{}) error {
var vl ValueLoader
switch dst := dst.(type) {
case ValueLoader:
vl = dst
case *[]Value:
vl = (*valueList)(dst)
case *map[string]Value:
vl = (*valueMap)(dst)
default:
if !isStructPtr(dst) {
return fmt.Errorf("bigquery: cannot convert %T to ValueLoader (need pointer to []Value, map[string]Value, or struct)", dst)
}
}
if err := it.nextFunc(); err != nil {
return err
}
row := it.rows[0]
it.rows = it.rows[1:]
if vl == nil {
// This can only happen if dst is a pointer to a struct. We couldn't
// set vl above because we need the schema.
if err := it.structLoader.set(dst, it.Schema); err != nil {
return err
}
vl = &it.structLoader
}
return vl.Load(row, it.Schema)
}
func isStructPtr(x interface{}) bool {
t := reflect.TypeOf(x)
return t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct
}
// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
// Currently pagination is not supported when the Storage API is enabled. IsAccelerated()
// method can be called to check if Storage API is enabled for the RowIterator.
func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
res, err := it.pf(it.ctx, it.src, it.Schema, it.StartIndex, int64(pageSize), pageToken)
if err != nil {
return "", err
}
it.rows = append(it.rows, res.rows...)
if it.Schema == nil {
it.Schema = res.schema
}
it.TotalRows = res.totalRows
return res.pageToken, nil
}
// rowSource represents one of the multiple sources of data for a row iterator.
// Rows can be read directly from a BigQuery table or from a job reference.
// If a job is present, that's treated as the authoritative source.
//
// rowSource can also cache results for special situations, primarily for the
// fast execution query path which can return status, rows, and schema all at
// once. Our cache data expectations are as follows:
//
// - We can only cache data from the start of a source.
// - We need to cache schema, rows, and next page token to effective service
// a request from cache.
// - cache references are destroyed as soon as they're interrogated. We don't
// want to retain the data unnecessarily, and we expect that the backend
// can always provide them if needed.
type rowSource struct {
j *Job
t *Table
queryID string
cachedRows []*bq.TableRow
cachedSchema *bq.TableSchema
cachedNextToken string
}
// fetchPageResult represents a page of rows returned from the backend.
type fetchPageResult struct {
pageToken string
rows [][]Value
totalRows uint64
schema Schema
}
// fetchPage is our generalized fetch mechanism. It interrogates from cache, and
// then dispatches to either the appropriate job or table-based backend mechanism
// as needed.
func fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
result, err := fetchCachedPage(ctx, src, schema, startIndex, pageSize, pageToken)
if err != nil {
if err != errNoCacheData {
// This likely means something more severe, like a problem with schema.
return nil, err
}
// If we failed to fetch data from cache, invoke the appropriate service method.
if src.j != nil {
return fetchJobResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
}
if src.t != nil {
return fetchTableResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
}
// No rows, but no table or job reference. Return an empty result set.
return &fetchPageResult{}, nil
}
return result, nil
}
func fetchTableResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
// Fetch the table schema in the background, if necessary.
errc := make(chan error, 1)
if schema != nil {
errc <- nil
} else {
go func() {
var bqt *bq.Table
err := runWithRetry(ctx, func() (err error) {
bqt, err = src.t.c.bqs.Tables.Get(src.t.ProjectID, src.t.DatasetID, src.t.TableID).
Fields("schema").
Context(ctx).
Do()
return err
})
if err == nil && bqt.Schema != nil {
schema = bqToSchema(bqt.Schema)
}
errc <- err
}()
}
call := src.t.c.bqs.Tabledata.List(src.t.ProjectID, src.t.DatasetID, src.t.TableID)
call = call.FormatOptionsUseInt64Timestamp(true)
setClientHeader(call.Header())
if pageToken != "" {
call.PageToken(pageToken)
} else {
call.StartIndex(startIndex)
}
if pageSize > 0 {
call.MaxResults(pageSize)
}
var res *bq.TableDataList
err := runWithRetry(ctx, func() (err error) {
res, err = call.Context(ctx).Do()
return err
})
if err != nil {
return nil, err
}
err = <-errc
if err != nil {
return nil, err
}
rows, err := convertRows(res.Rows, schema)
if err != nil {
return nil, err
}
return &fetchPageResult{
pageToken: res.PageToken,
rows: rows,
totalRows: uint64(res.TotalRows),
schema: schema,
}, nil
}
func fetchJobResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
// reduce data transfered by leveraging api projections
projectedFields := []googleapi.Field{"rows", "pageToken", "totalRows"}
call := src.j.c.bqs.Jobs.GetQueryResults(src.j.projectID, src.j.jobID).Location(src.j.location).Context(ctx)
call = call.FormatOptionsUseInt64Timestamp(true)
if schema == nil {
// only project schema if we weren't supplied one.
projectedFields = append(projectedFields, "schema")
}
call = call.Fields(projectedFields...)
setClientHeader(call.Header())
if pageToken != "" {
call.PageToken(pageToken)
} else {
call.StartIndex(startIndex)
}
if pageSize > 0 {
call.MaxResults(pageSize)
}
var res *bq.GetQueryResultsResponse
err := runWithRetry(ctx, func() (err error) {
res, err = call.Do()
return err
})
if err != nil {
return nil, err
}
// Populate schema in the rowsource if it's missing
if schema == nil {
schema = bqToSchema(res.Schema)
}
rows, err := convertRows(res.Rows, schema)
if err != nil {
return nil, err
}
return &fetchPageResult{
pageToken: res.PageToken,
rows: rows,
totalRows: uint64(res.TotalRows),
schema: schema,
}, nil
}
var errNoCacheData = errors.New("no rows in rowSource cache")
// fetchCachedPage attempts to service the first page of results. For the jobs path specifically, we have an
// opportunity to fetch rows before the iterator is constructed, and thus serve that data as the first request
// without an unnecessary network round trip.
func fetchCachedPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
// we have no cached data
if src.cachedRows == nil {
return nil, errNoCacheData
}
// we have no schema for decoding. convert from the cached representation if available.
if schema == nil {
if src.cachedSchema == nil {
// We can't progress with no schema, destroy references and return a miss.
src.cachedRows = nil
src.cachedNextToken = ""
return nil, errNoCacheData
}
schema = bqToSchema(src.cachedSchema)
}
// Only serve from cache where we're confident we know someone's asking for the first page
// without having to align data.
//
// Future consideration: we could service pagesizes smaller than the cache if we're willing to handle generation
// of pageTokens for the cache.
if pageToken == "" &&
startIndex == 0 &&
(pageSize == 0 || pageSize == int64(len(src.cachedRows))) {
converted, err := convertRows(src.cachedRows, schema)
if err != nil {
// destroy cache references and return error
src.cachedRows = nil
src.cachedSchema = nil
src.cachedNextToken = ""
return nil, err
}
result := &fetchPageResult{
pageToken: src.cachedNextToken,
rows: converted,
schema: schema,
totalRows: uint64(len(converted)),
}
// clear cache references and return response.
src.cachedRows = nil
src.cachedSchema = nil
src.cachedNextToken = ""
return result, nil
}
// All other cases are invalid. Destroy any cache references on the way out the door.
src.cachedRows = nil
src.cachedSchema = nil
src.cachedNextToken = ""
return nil, errNoCacheData
}