Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tsdb): Add block exporter. #14233

Merged
merged 1 commit into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat(tsdb): Add block exporter.
Adds export tooling to `influxd inspect export-blocks` so that we
can dump out block data in SQL format for better analysis during
the debugging process.
  • Loading branch information
benbjohnson committed Jul 1, 2019
commit 08e24faf4c96102c5b421596fe647b2494b6990c
30 changes: 30 additions & 0 deletions cmd/influxd/inspect/export_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package inspect

import (
"os"

"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)

func NewExportBlocksCommand() *cobra.Command {
return &cobra.Command{
Use: `export-blocks`,
Short: "Exports block data",
Long: `
This command will export all blocks in one or more TSM1 files to
another format for easier inspection and debugging.`,
RunE: func(cmd *cobra.Command, args []string) error {
e := tsm1.NewSQLBlockExporter(os.Stdout)
for _, arg := range args {
if err := e.ExportFile(arg); err != nil {
return err
}
}
if err := e.Close(); err != nil {
return err
}
return nil
},
}
}
1 change: 1 addition & 0 deletions cmd/influxd/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func NewCommand() *cobra.Command {
// List of available sub-commands
// If a new sub-command is created, it must be added here
subCommands := []*cobra.Command{
NewExportBlocksCommand(),
NewReportTSMCommand(),
NewVerifyWALCommand(),
}
Expand Down
5 changes: 5 additions & 0 deletions tsdb/explode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ func DecodeName(name [16]byte) (org, bucket platform.ID) {
return
}

// DecodeNameSlice converts tsdb internal serialization back to organization and bucket IDs.
func DecodeNameSlice(name []byte) (org, bucket platform.ID) {
return platform.ID(binary.BigEndian.Uint64(name[0:8])), platform.ID(binary.BigEndian.Uint64(name[8:16]))
}

// EncodeName converts org/bucket pairs to the tsdb internal serialization
func EncodeName(org, bucket platform.ID) [16]byte {
var nameBytes [16]byte
Expand Down
173 changes: 173 additions & 0 deletions tsdb/tsm1/block_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package tsm1

import (
"errors"
"fmt"
"io"
"os"
"strings"
"unicode/utf8"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)

// BlockExporter writes all blocks in a file to a given format.
type BlockExporter interface {
io.Closer
ExportFile(filename string) error
}

// Ensure type implements interface.
var _ BlockExporter = (*SQLBlockExporter)(nil)

// SQLBlockExporter writes out all blocks for TSM files to SQL.
type SQLBlockExporter struct {
w io.Writer
initialized bool // true when initial block written

// Write schema, if true.
ShowSchema bool
}

// NewSQLBlockExporter returns a new instance of SQLBlockExporter.
func NewSQLBlockExporter(w io.Writer) *SQLBlockExporter {
return &SQLBlockExporter{
w: w,

ShowSchema: true,
}
}

// Close ends the export and writes final output.
func (e *SQLBlockExporter) Close() error {
return nil
}

// ExportFile writes all blocks of the TSM file.
func (e *SQLBlockExporter) ExportFile(filename string) error {
if !e.initialized {
if err := e.initialize(); err != nil {
return err
}
}

f, err := os.OpenFile(filename, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()

r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()

itr := r.BlockIterator()
if itr == nil {
return errors.New("invalid TSM file, no block iterator")
}

fmt.Fprintln(e.w, `BEGIN TRANSACTION;`)
for itr.Next() {
key, minTime, maxTime, typ, checksum, buf, err := itr.Read()
if err != nil {
return err
}

// Extract organization & bucket ID.
var record blockExportRecord
record.Filename = filename
if len(key) < 16 {
record.Key = string(key)
} else {
record.OrgID, record.BucketID = tsdb.DecodeNameSlice(key[:16])
record.Key = string(key[16:])
}
record.Type = typ
record.MinTime = minTime
record.MaxTime = maxTime
record.Checksum = checksum
record.Count = BlockCount(buf)

if err := e.write(&record); err != nil {
return err
}
}
fmt.Fprintln(e.w, "COMMIT;")

if err := r.Close(); err != nil {
return fmt.Errorf("tsm1.SQLBlockExporter: cannot close reader: %s", err)
}

return nil
}

func (e *SQLBlockExporter) initialize() error {
if e.ShowSchema {
fmt.Fprintln(e.w, `
CREATE TABLE IF NOT EXISTS blocks (
filename TEXT NOT NULL,
org_id INTEGER NOT NULL,
bucket_id INTEGER NOT NULL,
key TEXT NOT NULL,
"type" TEXT NOT NULL,
min_time INTEGER NOT NULL,
max_time INTEGER NOT NULL,
checksum INTEGER NOT NULL,
count INTEGER NOT NULL
);

CREATE INDEX idx_blocks_filename ON blocks (filename);
CREATE INDEX idx_blocks_org_id_bucket_id_key ON blocks (org_id, bucket_id, key);
`[1:])
}

e.initialized = true

return nil
}

func (e *SQLBlockExporter) write(record *blockExportRecord) error {
_, err := fmt.Fprintf(e.w,
"INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES (%s, %d, %d, %s, %s, %d, %d, %d, %d);\n",
quoteSQL(record.Filename),
record.OrgID,
record.BucketID,
quoteSQL(record.Key),
quoteSQL(BlockTypeName(record.Type)),
record.MinTime,
record.MaxTime,
record.Checksum,
record.Count,
)
return err
}

type blockExportRecord struct {
Filename string
OrgID influxdb.ID
BucketID influxdb.ID
Key string
Type byte
MinTime int64
MaxTime int64
Checksum uint32
Count int
}

func quoteSQL(s string) string {
return `'` + sqlReplacer.Replace(toValidUTF8(s)) + `'`
}

var sqlReplacer = strings.NewReplacer(`'`, `''`, "\x00", "")

func toValidUTF8(s string) string {
return strings.Map(func(r rune) rune {
if r == utf8.RuneError {
return -1
}
return r
}, s)
}
47 changes: 47 additions & 0 deletions tsdb/tsm1/block_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tsm1

import (
"bytes"
"fmt"
"os"
"testing"
)

func TestSQLBlockExporter_Export(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)

// Write data.
if w, err := NewTSMWriter(f); err != nil {
t.Fatal(err)
} else if err := w.Write([]byte("cpu"), []Value{NewValue(0, int64(1))}); err != nil {
t.Fatal(err)
} else if err := w.Write([]byte("mem"), []Value{NewValue(0, int64(2))}); err != nil {
t.Fatal(err)
} else if err := w.WriteIndex(); err != nil {
t.Fatal(err)
} else if err := w.Close(); err != nil {
t.Fatal(err)
}

// Expected output.
want := fmt.Sprintf(`
BEGIN TRANSACTION;
INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES ('%s', 0, 0, 'cpu', 'integer', 0, 0, 3294968665, 1);
INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES ('%s', 0, 0, 'mem', 'integer', 0, 0, 755408492, 1);
COMMIT;
`[1:], f.Name(), f.Name())

// Export file to SQL.
var buf bytes.Buffer
e := NewSQLBlockExporter(&buf)
e.ShowSchema = false
if err := e.ExportFile(f.Name()); err != nil {
t.Fatal(err)
} else if err := e.Close(); err != nil {
t.Fatal(err)
} else if got := buf.String(); got != want {
t.Fatalf("unexpected output:\ngot=%s\n--\nwant=%s", got, want)
}
}
18 changes: 18 additions & 0 deletions tsdb/tsm1/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,21 @@ func getBooleanEncoder(sz int) BooleanEncoder {
return x
}
func putBooleanEncoder(enc BooleanEncoder) { booleanEncoderPool.Put(enc) }

// BlockTypeName returns a string name for the block type.
func BlockTypeName(typ byte) string {
switch typ {
case BlockFloat64:
return "float64"
case BlockInteger:
return "integer"
case BlockBoolean:
return "boolean"
case BlockString:
return "string"
case BlockUnsigned:
return "unsigned"
default:
return fmt.Sprintf("unknown(%d)", typ)
}
}