Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cmd/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
"github.com/google/subcommands"
"github.com/google/uuid"
"go.uber.org/zap"
)

// DataCmd struct with flags.
Expand All @@ -42,6 +44,8 @@ type DataCmd struct {
sessionJSON string
filePrefix string // TODO: move filePrefix to global flags
writeLimit int64
dryRun bool
logLevel string
}

// Name returns the name of operation.
Expand Down Expand Up @@ -75,6 +79,7 @@ func (cmd *DataCmd) SetFlags(f *flag.FlagSet) {
f.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.Int64Var(&cmd.writeLimit, "write-limit", defaultWritersLimit, "Write limit for writes to spanner")
f.BoolVar(&cmd.dryRun, "dry-run", false, "To validate the syntax of the command by running it in an air-gapped manner, such that no network calls are made.")
}

func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
Expand All @@ -83,9 +88,20 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
var err error
defer func() {
if err != nil {
fmt.Printf("FATAL error: %v\n", err)
logger.Log.Fatal("FATAL error", zap.Error(err))
}
}()
if cmd.dryRun {
fmt.Print("--dry-run flag is not implemented")
return subcommands.ExitFailure
}
err = logger.InitializeLogger(cmd.logLevel)
if err != nil {
fmt.Println("Error initialising logger, did you specify a valid log-level? [DEBUG, INFO, WARN, ERROR, FATAL]", err)
return subcommands.ExitFailure
}
defer logger.Log.Sync()

conv := internal.MakeConv()

sourceProfile, err := profiles.NewSourceProfile(cmd.sourceProfile, cmd.source)
Expand Down
12 changes: 11 additions & 1 deletion cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/google/subcommands"
"go.uber.org/zap"
)

// SchemaCmd struct with flags.
Expand All @@ -37,6 +39,7 @@ type SchemaCmd struct {
target string
targetProfile string
filePrefix string // TODO: move filePrefix to global flags
logLevel string
}

// Name returns the name of operation.
Expand Down Expand Up @@ -67,6 +70,7 @@ func (cmd *SchemaCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)")
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.StringVar(&cmd.logLevel, "log-level", "INFO", "Configure the logging level for the command (INFO, DEBUG), defaults to INFO")
}

func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
Expand All @@ -75,9 +79,15 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
var err error
defer func() {
if err != nil {
fmt.Printf("FATAL error: %v\n", err)
logger.Log.Fatal("FATAL error", zap.Error(err))
}
}()
err = logger.InitializeLogger(cmd.logLevel)
if err != nil {
fmt.Println("Error initialising logger, did you specify a valid log-level? [DEBUG, INFO, WARN, ERROR, FATAL]", err)
return subcommands.ExitFailure
}
defer logger.Log.Sync()

sourceProfile, err := profiles.NewSourceProfile(cmd.sourceProfile, cmd.source)
if err != nil {
Expand Down
18 changes: 17 additions & 1 deletion cmd/schema_and_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
"github.com/google/subcommands"
"github.com/google/uuid"
"go.uber.org/zap"
)

// SchemaAndDataCmd struct with flags.
Expand All @@ -41,6 +43,8 @@ type SchemaAndDataCmd struct {
skipForeignKeys bool
filePrefix string // TODO: move filePrefix to global flags
writeLimit int64
dryRun bool
logLevel string
}

// Name returns the name of operation.
Expand Down Expand Up @@ -73,6 +77,8 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) {
f.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.Int64Var(&cmd.writeLimit, "write-limit", defaultWritersLimit, "Write limit for writes to spanner")
f.BoolVar(&cmd.dryRun, "dry-run", false, "To validate the syntax of the command by running it in an air-gapped manner, such that no network calls are made.")
f.StringVar(&cmd.logLevel, "log-level", "INFO", "Configure the logging level for the command (INFO, DEBUG), defaults to INFO")
}

func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
Expand All @@ -81,9 +87,19 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
var err error
defer func() {
if err != nil {
fmt.Printf("FATAL error: %v\n", err)
logger.Log.Fatal("FATAL error", zap.Error(err))
}
}()
if cmd.dryRun {
fmt.Print("--dry-run flag is not implemented")
return subcommands.ExitFailure
}
err = logger.InitializeLogger(cmd.logLevel)
if err != nil {
fmt.Println("Error initialising logger, did you specify a valid log-level? [DEBUG, INFO, WARN, ERROR, FATAL]", err)
return subcommands.ExitFailure
}
defer logger.Log.Sync()

sourceProfile, err := profiles.NewSourceProfile(cmd.sourceProfile, cmd.source)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ func getInstances(ctx context.Context, project string) ([]string, error) {
}

func GetPassword() string {
calledFromGCloud := os.Getenv("GCLOUD_HB_PLUGIN")
if strings.EqualFold(calledFromGCloud, "true") {
fmt.Println("\n Please specify password in enviroment variables (recommended) or --source-profile " +
"(not recommended) while using HarbourBridge from gCloud CLI.")
return ""
}
fmt.Print("Enter Password: ")
bytePassword, err := terminal.ReadPassword(int(syscall.Stdin))
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
dydb "github.com/aws/aws-sdk-go/service/dynamodb"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"

"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/common/metrics"
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/sources/common"
"github.com/cloudspannerecosystem/harbourbridge/sources/csv"
Expand All @@ -61,6 +58,10 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/sources/sqlserver"
"github.com/cloudspannerecosystem/harbourbridge/spanner/ddl"
"github.com/cloudspannerecosystem/harbourbridge/spanner/writer"
"go.uber.org/zap"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)

var (
Expand Down Expand Up @@ -368,6 +369,7 @@ func getSeekable(f *os.File) (*os.File, int64, error) {
return f, n, err
}
internal.VerbosePrintln("Creating a tmp file with a copy of stdin because stdin is not seekable.")
logger.Log.Debug("Creating a tmp file with a copy of stdin because stdin is not seekable.")

// Create file in os.TempDir. Its not clear this is a good idea e.g. if the
// pg_dump/mysqldump output is large (tens of GBs) and os.TempDir points to a directory
Expand Down Expand Up @@ -564,6 +566,8 @@ Recommended value is between 20-30.`)
workers <- workerID
}()
internal.VerbosePrintf("Submitting new FK create request: %s\n", fkStmt)
logger.Log.Debug("Submitting new FK create request", zap.String("fkStmt", fkStmt))

op, err := adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbURI,
Statements: []string{fkStmt},
Expand All @@ -579,6 +583,7 @@ Recommended value is between 20-30.`)
return
}
internal.VerbosePrintln("Updated schema with statement: " + fkStmt)
logger.Log.Debug("Updated schema with statement", zap.String("fkStmt", fkStmt))
}(fkStmt, workerID)
}
// Wait for all the goroutines to finish.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/pingcap/tidb/parser v0.0.0-20220411093434-32b9c14779c2
github.com/sijms/go-ora/v2 v2.2.17
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
golang.org/x/net v0.0.0-20220325170049-de3da57026de
golang.org/x/tools v0.1.11-0.20220316014157-77aa08bb151a // indirect
Expand Down
8 changes: 8 additions & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"fmt"
"time"

"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
"github.com/cloudspannerecosystem/harbourbridge/schema"
"github.com/cloudspannerecosystem/harbourbridge/spanner/ddl"
"go.uber.org/zap"
)

// Conv contains all schema and data conversion state.
Expand Down Expand Up @@ -204,6 +206,8 @@ func (conv *Conv) WriteRow(srcTable, spTable string, spCols []string, spVals []i
if conv.dataSink == nil {
msg := "Internal error: ProcessDataRow called but dataSink not configured"
VerbosePrintf("%s\n", msg)
logger.Log.Debug("Internal error: ProcessDataRow called but dataSink not configured")

conv.Unexpected(msg)
conv.StatsAddBadRow(srcTable, conv.DataMode())
} else {
Expand Down Expand Up @@ -342,6 +346,8 @@ func (conv *Conv) buildPrimaryKey(spTable string) string {
// because we process dump data twice.
func (conv *Conv) Unexpected(u string) {
VerbosePrintf("Unexpected condition: %s\n", u)
logger.Log.Debug("Unexpected condition", zap.String("condition", u))

// Limit size of unexpected map. If over limit, then only
// update existing entries.
if _, ok := conv.Stats.Unexpected[u]; ok || len(conv.Stats.Unexpected) < 1000 {
Expand Down Expand Up @@ -390,6 +396,7 @@ func (conv *Conv) getStatementStat(s string) *statementStat {
func (conv *Conv) SkipStatement(stmtType string) {
if conv.SchemaMode() { // Record statement stats on first pass only.
VerbosePrintf("Skipping statement: %s\n", stmtType)
logger.Log.Debug("Skipping statement", zap.String("stmtType", stmtType))
conv.getStatementStat(stmtType).Skip++
}
}
Expand All @@ -398,6 +405,7 @@ func (conv *Conv) SkipStatement(stmtType string) {
func (conv *Conv) ErrorInStatement(stmtType string) {
if conv.SchemaMode() { // Record statement stats on first pass only.
VerbosePrintf("Error processing statement: %s\n", stmtType)
logger.Log.Debug("Error processing statement", zap.String("stmtType", stmtType))
conv.getStatementStat(stmtType).Error++
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/spanner/ddl"
)

// This file contains very basic tests of Conv API functionality.
// Most of the Conv APIs are also tested in process_test.go (where
// they are tested using data from schema/data conversion).

func init() {
logger.Log = zap.NewNop()
}
func TestSetSchemaMode(t *testing.T) {
conv := MakeConv()
conv.SetSchemaMode()
Expand Down
3 changes: 3 additions & 0 deletions internal/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"strings"

"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/spanner/ddl"
)

Expand Down Expand Up @@ -46,6 +47,7 @@ func GetSpannerTable(conv *Conv, srcTable string) (string, error) {
spTable := getSpannerID(conv, srcTable)
if spTable != srcTable {
VerbosePrintf("Mapping source DB table %s to Spanner table %s\n", srcTable, spTable)
logger.Log.Debug(fmt.Sprintf("Mapping source DB table %s to Spanner table %s\n", srcTable, spTable))
}
conv.ToSpanner[srcTable] = NameAndCols{Name: spTable, Cols: make(map[string]string)}
conv.ToSource[spTable] = NameAndCols{Name: srcTable, Cols: make(map[string]string)}
Expand Down Expand Up @@ -118,6 +120,7 @@ func GetSpannerCol(conv *Conv, srcTable, srcCol string, mustExist bool) (string,
}
if spCol != srcCol {
VerbosePrintf("Mapping source DB col %s (table %s) to Spanner col %s\n", srcCol, srcTable, spCol)
logger.Log.Debug(fmt.Sprintf("Mapping source DB col %s (table %s) to Spanner col %s\n", srcCol, srcTable, spCol))
}
conv.ToSpanner[srcTable].Cols[srcCol] = spCol
conv.ToSource[sp.Name].Cols[spCol] = srcCol
Expand Down
5 changes: 5 additions & 0 deletions internal/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"fmt"
"strconv"
"strings"

"github.com/cloudspannerecosystem/harbourbridge/logger"
"go.uber.org/zap"
)

// Progress provides console progress functionality. i.e. it reports what
Expand Down Expand Up @@ -87,6 +90,7 @@ func (p *Progress) reportPct(firstCall bool) {
fmt.Printf("%s: %2d%%\n", p.message, p.pct)
return
}
logger.Log.Debug(p.message, zap.Int("Progress", p.pct))
if firstCall {
fmt.Printf("%s: %2d%%", p.message, p.pct)
} else {
Expand All @@ -102,6 +106,7 @@ func (p *Progress) reportFraction(firstCall bool) {
fmt.Printf("%s: %d/%d\n", p.message, p.progress, p.total)
return
}
logger.Log.Debug(p.message, zap.Float32("Progress", float32(p.progress/p.total)))
if firstCall {
fmt.Printf("%s: %d/%d", p.message, p.progress, p.total)
} else {
Expand Down
33 changes: 33 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package logger

import (
"os"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const LOG_FILE_NAME = "harbourbridge.log"

var Log *zap.Logger

func InitializeLogger(inputLogLevel string) error {
config := zap.NewProductionEncoderConfig()
config.EncodeTime = zapcore.ISO8601TimeEncoder
fileEncoder := zapcore.NewJSONEncoder(config)
consoleEncoder := zapcore.NewConsoleEncoder(config)
logFile, _ := os.OpenFile(LOG_FILE_NAME, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
writer := zapcore.AddSync(logFile)
zapLogLevel := new(zapcore.Level)
err := zapLogLevel.Set(inputLogLevel)
if err != nil {
return err
}
logLevel := zap.NewAtomicLevelAt(*zapLogLevel)
core := zapcore.NewTee(
zapcore.NewCore(fileEncoder, writer, logLevel),
zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), logLevel),
)
Log = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
return nil
}
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/logger"
"github.com/cloudspannerecosystem/harbourbridge/web"
"github.com/cloudspannerecosystem/harbourbridge/webv2"
"github.com/google/subcommands"
Expand Down Expand Up @@ -119,6 +120,11 @@ func main() {
fmt.Printf("\nWarning: Found usage of deprecated flags. Support for these " +
"flags will be discontinued soon.\nIt is recommended to use Harbourbridge " +
"using connection profiles. Checkout usage here: https://github.com/cloudspannerecosystem/harbourbridge/tree/master/cmd#command-line-flags\n\n")
err = logger.InitializeLogger("INFO")
if err != nil {
panic(fmt.Errorf("error initialising logger"))
}
defer logger.Log.Sync()
// Running HB CLI in global command line mode.
setupGlobalFlags()
flag.Usage = usage
Expand Down
Loading