Skip to content

Commit

Permalink
merges con.Raw() in a func and refactors connectionString()
Browse files Browse the repository at this point in the history
  • Loading branch information
mii9000 committed Mar 13, 2024
1 parent 9c9e0b3 commit 03592d4
Showing 1 changed file with 41 additions and 51 deletions.
92 changes: 41 additions & 51 deletions pkg/driver/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"unsafe"
Expand Down Expand Up @@ -106,44 +105,27 @@ func (drv *Driver) CreateMigrationsTable(db *sql.DB) error {
client := getClient(driverConn)
dataset := getDataset(driverConn)
exists, err = tableExists(client, dataset, drv.migrationsTableName)

if err != nil {
return err
}

return nil
})

wg.Wait()

if err != nil {
return err
}

//if already exists then return early
if exists {
return nil
}

wg.Add(1)

//if here then does not exist so create table
err = con.Raw(func(driverConn any) error {
defer wg.Done()

client := getClient(driverConn)
dataset := getDataset(driverConn)
table := client.Dataset(dataset).Table(drv.migrationsTableName)
err := table.Create(ctx, &bigquery.TableMetadata{
Schema: bigquery.Schema{
{
Name: "version",
Type: bigquery.StringFieldType,
if !exists {
table := client.Dataset(dataset).Table(drv.migrationsTableName)
err := table.Create(ctx, &bigquery.TableMetadata{
Schema: bigquery.Schema{
{
Name: "version",
Type: bigquery.StringFieldType,
},
},
},
})
if err != nil {
return err
})

if err != nil {
return err
}
}

return nil
})

Expand Down Expand Up @@ -683,35 +665,43 @@ func tableExists(client *bigquery.Client, datasetID, tableName string) (bool, er
}

func connectionString(u *url.URL) string {
var params, locationPath string
params := url.Values{}

fields := strings.Split(strings.TrimPrefix(u.Path, "/"), "/")
projectID := u.Hostname()
dataset := fields[len(fields)-1]
disableAuth := u.Query().Get("disable_auth") == "true"
paths := strings.Split(strings.TrimPrefix(u.Path, "/"), "/")

if disableAuth {
params += fmt.Sprintf("?disable_auth=%s", strconv.FormatBool(disableAuth))
if u.Query().Get("disable_auth") == "true" {
params.Set("disable_auth", "true")
}

formattedURL := &url.URL{}
if u.Port() != "" {
if disableAuth {
params += "&"
formattedURL, _ = url.Parse(fmt.Sprintf("bigquery://%s", paths[0]))
params.Set("endpoint", fmt.Sprintf("http://%s:%s", u.Hostname(), u.Port()))
if len(paths) == 3 {
// bigquery://host:port/project/location/dataset
formattedURL.Path += "/" + paths[1]
formattedURL.Path += "/" + paths[2]
} else {
params += "?"
}
params += "endpoint=" + url.QueryEscape(fmt.Sprintf("http://%s:%s", u.Hostname(), u.Port()))
projectID = fields[0]
if len(fields) == 3 {
locationPath = "/" + fields[1]
// bigquery://host:port/project/dataset
formattedURL.Path += "/" + paths[1]
}
} else {
if len(fields) == 2 {
locationPath = "/" + fields[0]
formattedURL, _ = url.Parse(fmt.Sprintf("bigquery://%s", u.Hostname()))
if len(paths) == 2 {
// bigquery://project/location/dataset
formattedURL.Path += "/" + paths[0]
formattedURL.Path += "/" + paths[1]
} else {
// bigquery://project/dataset
formattedURL.Path += "/" + paths[0]
}
}

return fmt.Sprintf("bigquery://%s%s/%s%s", projectID, locationPath, dataset, params)
if len(params) != 0 {
formattedURL.RawQuery = params.Encode()
}

return formattedURL.String()
}

func getClient(con any) *bigquery.Client {
Expand Down

0 comments on commit 03592d4

Please sign in to comment.