Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

init command #279

Merged
merged 1 commit into from
Feb 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features
- RethinkDB SSL support added in [#268](https://github.com/compose/transporter/pull/268)
- `transporter init [source] [sink]` command added in [#279](https://github.com/compose/transporter/pull/279)

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Run
- run `transporter run --config ./test/config.yaml ./test/application.js`
- eval `transporter eval --config ./test/config.yaml 'Source({name:"localmongo", namespace: "boom.foo"}).save({name:"tofile"})' `
- test `transporter test --config ./test/config.yaml test/application.js `
- init `transporter init mongodb mongodb`

Complete beginners guide
---
Expand Down
3 changes: 3 additions & 0 deletions cmd/transporter/about.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
func runAbout(args []string) error {
flagset := baseFlagSet("about", nil)
flagset.Usage = usageFor(flagset, "transporter about [adaptor]")
if err := flagset.Parse(args); err != nil {
return err
}

args = flagset.Args()
if len(args) > 0 {
Expand Down
50 changes: 50 additions & 0 deletions cmd/transporter/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"fmt"
"os"

"github.com/compose/transporter/pkg/adaptor"
)

func runInit(args []string) error {
flagset := baseFlagSet("init", nil)
flagset.Usage = usageFor(flagset, "transporter init [source] [sink]")
if err := flagset.Parse(args); err != nil {
return err
}

args = flagset.Args()
if len(args) != 2 {
return fmt.Errorf("wrong number of arguments provided, expected 2, got %d", len(args))
}
fmt.Printf("Writing %s...\n", defaultConfigFile)
cfgFileHandle, err := os.Create(defaultConfigFile)
if err != nil {
return err
}
defer cfgFileHandle.Close()
cfgFileHandle.WriteString("nodes:\n")
nodeName := "source"
for _, name := range args {
creator, ok := adaptor.Adaptors[name]
if !ok {
return fmt.Errorf("no adaptor named '%s' exists", name)
}
dummyAdaptor, _ := creator(nil, "", adaptor.Config{"uri": "test", "namespace": "test.test"})
if d, ok := dummyAdaptor.(adaptor.Describable); ok {
cfgFileHandle.WriteString(fmt.Sprintf(" %s:\n%s\n", nodeName, d.SampleConfig()))
nodeName = "sink"
} else {
return fmt.Errorf("adaptor '%s' did not provide a sample config", name)
}
}
fmt.Println("Writing pipeline.js...")
appFileHandle, err := os.Create("pipeline.js")
if err != nil {
return err
}
defer appFileHandle.Close()
appFileHandle.WriteString(`Source({name:"source", namespace:"test./.*/"}).save({name:"sink", namespace:"test./.*/"})\n`)
return nil
}
3 changes: 3 additions & 0 deletions cmd/transporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func usage() {
fmt.Fprintf(os.Stderr, " test display the compiled nodes without starting a pipeline\n")
fmt.Fprintf(os.Stderr, " eval eval javascript to build and run a transporter application\n")
fmt.Fprintf(os.Stderr, " about show information about available adaptors\n")
fmt.Fprintf(os.Stderr, " init initialize a config and pipeline file based from provided adaptors\n")
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "VERSION\n")
fmt.Fprintf(os.Stderr, " %s\n", version)
Expand All @@ -51,6 +52,8 @@ func main() {
run = runEval
case "about":
run = runAbout
case "init":
run = runInit
default:
usage()
os.Exit(1)
Expand Down
23 changes: 10 additions & 13 deletions pkg/adaptor/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import (

const (
description = "an elasticsearch sink adaptor"
sampleConfig = `
- es:
type: elasticsearch
sampleConfig = ` type: elasticsearch
uri: https://username:password@hostname:port
timeout: 10s # optional, defaults to 30s
aws_access_key: XXX # optional, used for signing requests to AWS Elasticsearch service
aws_access_secret: XXX # optional, used for signing requests to AWS Elasticsearch service
`
# timeout: 10s # defaults to 30s
# aws_access_key: XXX # used for signing requests to AWS Elasticsearch service
# aws_access_secret: XXX # used for signing requests to AWS Elasticsearch service`
)

var (
Expand Down Expand Up @@ -113,11 +110,8 @@ func init() {
return e, adaptor.NewError(adaptor.CRITICAL, path, fmt.Sprintf("can't split namespace into index and typeMatch (%s)", err.Error()), nil)
}

if err := e.setupClient(conf); err != nil {
return nil, err
}

return e, nil
err = e.setupClient(conf)
return e, err
})
}

Expand Down Expand Up @@ -215,7 +209,10 @@ func (e *Elasticsearch) setupClient(conf Config) error {
}

func determineVersion(uri string, user *url.Userinfo) (string, error) {
req, _ := http.NewRequest(http.MethodGet, uri, nil)
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return "", err
}
if user != nil {
if pwd, ok := user.Password(); ok {
req.SetBasicAuth(user.Username(), pwd)
Expand Down
8 changes: 3 additions & 5 deletions pkg/adaptor/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
)

const (
sampleConfig = `
- stdout:
type: file
uri: stdout://
`
sampleConfig = ` type: file
uri: stdout://`

description = "an adaptor that reads / writes files"
)

Expand Down
21 changes: 9 additions & 12 deletions pkg/adaptor/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ import (
const (
description = "a mongodb adaptor that functions as both a source and a sink"

sampleConfig = `
- localmongo:
type: mongodb
uri: mongodb://127.0.0.1:27017/test
# timeout: 30s
# tail: false
# ssl: false
# cacerts: ["/path/to/cert.pem"]
# wc: 1
# fsync: false
# bulk: false
`
sampleConfig = ` type: mongodb
uri: ${MONGODB_URI}
# timeout: 30s
# tail: false
# ssl: false
# cacerts: ["/path/to/cert.pem"]
# wc: 1
# fsync: false
# bulk: false`
)

// Config provides configuration options for a mongodb adaptor
Expand Down
22 changes: 11 additions & 11 deletions pkg/adaptor/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import (
_ "github.com/lib/pq" // import pq driver
)

const (
sampleConfig = ` type: postgres
uri: ${POSTGRESQL_URI}
# debug: false
# tail: false
# replication_slot: slot`

description = "a postgres adaptor that functions as both a source and a sink"
)

// Postgres is an adaptor to read / write to postgres.
// it works as a source by copying files, and then optionally tailing the oplog
type Postgres struct {
Expand Down Expand Up @@ -52,13 +62,9 @@ type Postgres struct {
type Config struct {
URI string `json:"uri" doc:"the uri to connect to, in the form 'user=my-user password=my-password dbname=dbname sslmode=require'"`
Namespace string `json:"namespace" doc:"mongo namespace to read/write"`
Timeout string `json:"timeout" doc:"timeout for establishing connection, format must be parsable by time.ParseDuration and defaults to 10s"`
Debug bool `json:"debug" doc:"display debug information"`
Tail bool `json:"tail" doc:"if tail is true, then the postgres source will tail the oplog after copying the namespace"`
ReplicationSlot string `json:"replication_slot" doc:"required if tail is true; sets the replication slot to use for logical decoding"`
Wc int `json:"wc" doc:"The write concern to use for writes, Int, indicating the minimum number of servers to write to before returning success/failure"`
FSync bool `json:"fsync" doc:"When writing, should we flush to disk before returning success"`
Bulk bool `json:"bulk" doc:"use a buffer to bulk insert documents"`
}

func init() {
Expand Down Expand Up @@ -100,15 +106,9 @@ func init() {

// Description for postgres adaptor
func (p *Postgres) Description() string {
return "a postgres adaptor that functions as both a source and a sink"
return description
}

const sampleConfig = `
- localpostgres:
type: postgres
uri: postgres://127.0.0.1:5432/test
`

// SampleConfig for postgres adaptor
func (p *Postgres) SampleConfig() string {
return sampleConfig
Expand Down
15 changes: 6 additions & 9 deletions pkg/adaptor/rethinkdb/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ import (
)

const (
sampleConfig = `
- rethink:
type: rethinkdb
uri: rethink://127.0.0.1:28015
# timeout: 30s
# tail: false
# ssl: false
# cacerts: ["/path/to/cert.pem"]
`
sampleConfig = ` type: rethinkdb
uri: ${RETHINKDB_URI}
# timeout: 30s
# tail: false
# ssl: false
# cacerts: ["/path/to/cert.pem"]`

description = "a rethinkdb adaptor that functions as both a source and a sink"
)
Expand Down