Skip to content

Commit

Permalink
feat(bigquery): expose connections and schema autodetect modifier (#5739
Browse files Browse the repository at this point in the history
)

* feat(bigquery): expose connections and schema autodetect modifier

This exposes the connectionID in the in the external data config,
and wires in the new autodetect schema as a variadic option on the
Update() method for tables.

Technically this is a breaking signature change, but in practice
variadic options can be omitted entirely, so existing code will
continue to work without issue.
  • Loading branch information
shollyman authored May 3, 2022
1 parent 9e979c9 commit c72e34f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 5 deletions.
7 changes: 7 additions & 0 deletions bigquery/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ type ExternalDataConfig struct {
//
// StringTargetType supports all precision and scale values.
DecimalTargetTypes []DecimalTargetType

// ConnectionID associates an external data configuration with a connection ID.
// Connections are managed through the BigQuery Connection API:
// https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1
ConnectionID string
}

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
Expand All @@ -115,6 +120,7 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
IgnoreUnknownValues: e.IgnoreUnknownValues,
MaxBadRecords: e.MaxBadRecords,
HivePartitioningOptions: e.HivePartitioningOptions.toBQ(),
ConnectionId: e.ConnectionID,
}
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
Expand All @@ -138,6 +144,7 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
MaxBadRecords: q.MaxBadRecords,
Schema: bqToSchema(q.Schema),
HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions),
ConnectionID: q.ConnectionId,
}
for _, v := range q.DecimalTargetTypes {
e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v))
Expand Down
1 change: 1 addition & 0 deletions bigquery/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestExternalDataConfig(t *testing.T) {
SkipLeadingRows: 3,
NullMarker: "marker",
},
ConnectionID: "connection",
},
{
SourceFormat: GoogleSheets,
Expand Down
77 changes: 77 additions & 0 deletions bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,83 @@ func TestIntegration_IteratorSource(t *testing.T) {
}
}

func TestIntegration_ExternalAutodetect(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

testTable := dataset.Table(tableIDs.New())

origExtCfg := &ExternalDataConfig{
SourceFormat: Avro,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/original*.avro"},
}

err := testTable.Create(ctx, &TableMetadata{
ExternalDataConfig: origExtCfg,
})
if err != nil {
t.Fatalf("Table.Create(%q): %v", testTable.FullyQualifiedName(), err)
}

origMeta, err := testTable.Metadata(ctx)
if err != nil {
t.Fatalf("Table.Metadata(%q): %v", testTable.FullyQualifiedName(), err)
}

wantSchema := Schema{
{Name: "stringfield", Type: "STRING"},
{Name: "int64field", Type: "INTEGER"},
}
if diff := testutil.Diff(origMeta.Schema, wantSchema); diff != "" {
t.Fatalf("orig schema, got=-, want=+\n%s", diff)
}

// Now, point at the new files, but don't signal autodetect.
newExtCfg := &ExternalDataConfig{
SourceFormat: Avro,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/widened*.avro"},
}

newMeta, err := testTable.Update(ctx, TableMetadataToUpdate{
ExternalDataConfig: newExtCfg,
}, origMeta.ETag)
if err != nil {
t.Fatalf("Table.Update(%q): %v", testTable.FullyQualifiedName(), err)
}
if diff := testutil.Diff(newMeta.Schema, wantSchema); diff != "" {
t.Fatalf("new schema, got=-, want=+\n%s", diff)
}

// Now, signal autodetect in another update.
// This should yield a new schema.
newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{}, newMeta.ETag, WithAutoDetectSchema(true))
if err != nil {
t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err)
}

wantSchema2 := Schema{
{Name: "stringfield", Type: "STRING"},
{Name: "int64field", Type: "INTEGER"},
{Name: "otherfield", Type: "INTEGER"},
}
if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" {
t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff)
}

id, _ := testTable.Identifier(StandardSQLID)
q := client.Query(fmt.Sprintf("SELECT * FROM %s", id))
it, err := q.Read(ctx)
if err != nil {
t.Fatalf("query read: %v", err)
}
wantRows := [][]Value{
{"bar", int64(32), int64(314)},
}
checkReadAndTotalRows(t, "row check", it, wantRows)
}

func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
43 changes: 38 additions & 5 deletions bigquery/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,23 +758,48 @@ func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator {
// NeverExpire is a sentinel value used to remove a table'e expiration time.
var NeverExpire = time.Time{}.Add(-1)

// We use this for the option pattern rather than exposing the underlying
// discovery type directly.
type tablePatchCall struct {
call *bq.TablesPatchCall
}

// TableUpdateOption allow requests to update table metadata.
type TableUpdateOption func(*tablePatchCall)

// WithAutoDetectSchema governs whether the schema autodetection occurs as part of the table update.
// This is relevant in cases like external tables where schema is detected from the source data.
func WithAutoDetectSchema(b bool) TableUpdateOption {
return func(tpc *tablePatchCall) {
tpc.call.AutodetectSchema(b)
}
}

// Update modifies specific Table metadata fields.
func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string) (md *TableMetadata, err error) {
func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string, opts ...TableUpdateOption) (md *TableMetadata, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Update")
defer func() { trace.EndSpan(ctx, err) }()

bqt, err := tm.toBQ()
if err != nil {
return nil, err
}
call := t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx)
setClientHeader(call.Header())

tpc := &tablePatchCall{
call: t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx),
}

for _, o := range opts {
o(tpc)
}

setClientHeader(tpc.call.Header())
if etag != "" {
call.Header().Set("If-Match", etag)
tpc.call.Header().Set("If-Match", etag)
}
var res *bq.Table
if err := runWithRetry(ctx, func() (err error) {
res, err = call.Do()
res, err = tpc.call.Do()
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -807,6 +832,10 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
if tm.EncryptionConfig != nil {
t.EncryptionConfiguration = tm.EncryptionConfig.toBQ()
}
if tm.ExternalDataConfig != nil {
cfg := tm.ExternalDataConfig.toBQ()
t.ExternalDataConfiguration = &cfg
}

if tm.Clustering != nil {
t.Clustering = tm.Clustering.toBQ()
Expand Down Expand Up @@ -893,6 +922,10 @@ type TableMetadataToUpdate struct {
// set ExpirationTime to NeverExpire. The zero value is ignored.
ExpirationTime time.Time

// ExternalDataConfig controls the definition of a table defined against
// an external source, such as one based on files in Google Cloud Storage.
ExternalDataConfig *ExternalDataConfig

// The query to use for a view.
ViewQuery optional.String

Expand Down

0 comments on commit c72e34f

Please sign in to comment.