Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Backport lponly feature to 1.8 #19611

Merged
merged 1 commit into from
Sep 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 83 additions & 34 deletions cmd/influx_inspect/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -36,6 +37,7 @@ type Command struct {
startTime int64
endTime int64
compress bool
lponly bool

manifest map[string]struct{}
tsmFiles map[string][]string
Expand Down Expand Up @@ -65,6 +67,7 @@ func (cmd *Command) Run(args ...string) error {
fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)")
fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)")
fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)")
fs.BoolVar(&cmd.lponly, "lponly", false, "Only export line protocol")
fs.BoolVar(&cmd.compress, "compress", false, "Compress the output")

fs.SetOutput(cmd.Stdout)
Expand Down Expand Up @@ -123,6 +126,7 @@ func (cmd *Command) export() error {
if err := cmd.walkWALFiles(); err != nil {
return err
}

return cmd.write()
}

Expand Down Expand Up @@ -187,6 +191,68 @@ func (cmd *Command) walkWALFiles() error {
})
}

func (cmd *Command) writeDDL(mw io.Writer, w io.Writer) error {
// Write out all the DDL
fmt.Fprintln(mw, "# DDL")
for key := range cmd.manifest {
keys := strings.Split(key, string(os.PathSeparator))
db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1])
fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp)
}

return nil
}

func (cmd *Command) writeDML(mw io.Writer, w io.Writer) error {
fmt.Fprintln(mw, "# DML")
for key := range cmd.manifest {
keys := strings.Split(key, string(os.PathSeparator))
fmt.Fprintf(mw, "# CONTEXT-DATABASE:%s\n", keys[0])
fmt.Fprintf(mw, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1])
if files, ok := cmd.tsmFiles[key]; ok {
fmt.Fprintf(cmd.Stdout, "writing out tsm file data for %s...", key)
if err := cmd.writeTsmFiles(mw, w, files); err != nil {
return err
}
fmt.Fprintln(cmd.Stdout, "complete.")
}
if _, ok := cmd.walFiles[key]; ok {
fmt.Fprintf(cmd.Stdout, "writing out wal file data for %s...", key)
if err := cmd.writeWALFiles(mw, w, cmd.walFiles[key], key); err != nil {
return err
}
fmt.Fprintln(cmd.Stdout, "complete.")
}
}

return nil
}

// writeFull writes the full DML and DDL to the supplied io.Writers. mw is the
// "meta" writer where comments and other informational writes go and w is for
// the actual payload of the writes -- DML and DDL.
//
// Typically mw and w are the same but if we'd like to, for example, filter out
// comments and other meta data, we can pass ioutil.Discard to mw to only
// include the raw data that writeFull() generates.
func (cmd *Command) writeFull(mw io.Writer, w io.Writer) error {
s, e := time.Unix(0, cmd.startTime).Format(time.RFC3339), time.Unix(0, cmd.endTime).Format(time.RFC3339)

fmt.Fprintf(mw, "# INFLUXDB EXPORT: %s - %s\n", s, e)

if shouldWriteDDL := !cmd.lponly; shouldWriteDDL {
if err := cmd.writeDDL(mw, w); err != nil {
return err
}
}

if err := cmd.writeDML(mw, w); err != nil {
return err
}

return nil
}

func (cmd *Command) write() error {
// open our output file and create an output buffer
f, err := os.Create(cmd.out)
Expand All @@ -209,42 +275,25 @@ func (cmd *Command) write() error {
w = gzw
}

s, e := time.Unix(0, cmd.startTime).Format(time.RFC3339), time.Unix(0, cmd.endTime).Format(time.RFC3339)
fmt.Fprintf(w, "# INFLUXDB EXPORT: %s - %s\n", s, e)

// Write out all the DDL
fmt.Fprintln(w, "# DDL")
for key := range cmd.manifest {
keys := strings.Split(key, string(os.PathSeparator))
db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1])
fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp)
// mw is our "meta writer" -- the io.Writer to which meta/out-of-band data
// like comments will be sent. If the lponly flag is set, mw will be
// ioutil.Discard which effectively filters out comments and any other
// non-line protocol data.
//
// Otherwise, mw is set to the same writer as the actual DDL and line
// protocol DML which will cause the comments to be intermixed with the
// data..
//
mw := w
if cmd.lponly {
mw = ioutil.Discard
}

fmt.Fprintln(w, "# DML")
for key := range cmd.manifest {
keys := strings.Split(key, string(os.PathSeparator))
fmt.Fprintf(w, "# CONTEXT-DATABASE:%s\n", keys[0])
fmt.Fprintf(w, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1])
if files, ok := cmd.tsmFiles[key]; ok {
fmt.Fprintf(cmd.Stdout, "writing out tsm file data for %s...", key)
if err := cmd.writeTsmFiles(w, files); err != nil {
return err
}
fmt.Fprintln(cmd.Stdout, "complete.")
}
if _, ok := cmd.walFiles[key]; ok {
fmt.Fprintf(cmd.Stdout, "writing out wal file data for %s...", key)
if err := cmd.writeWALFiles(w, cmd.walFiles[key], key); err != nil {
return err
}
fmt.Fprintln(cmd.Stdout, "complete.")
}
}
return nil
return cmd.writeFull(mw, w)
}

func (cmd *Command) writeTsmFiles(w io.Writer, files []string) error {
fmt.Fprintln(w, "# writing tsm data")
func (cmd *Command) writeTsmFiles(mw io.Writer, w io.Writer, files []string) error {
fmt.Fprintln(mw, "# writing tsm data")

// we need to make sure we write the same order that the files were written
sort.Strings(files)
Expand Down Expand Up @@ -298,8 +347,8 @@ func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error {
return nil
}

func (cmd *Command) writeWALFiles(w io.Writer, files []string, key string) error {
fmt.Fprintln(w, "# writing wal data")
func (cmd *Command) writeWALFiles(mw io.Writer, w io.Writer, files []string, key string) error {
fmt.Fprintln(mw, "# writing wal data")

// we need to make sure we write the same order that the wal received the data
sort.Strings(files)
Expand Down