Skip to content

Commit

Permalink
Merge pull request #14233 from influxdata/feat/tsm1-block-exporter
Browse files Browse the repository at this point in the history
feat(tsdb): Add block exporter.
  • Loading branch information
benbjohnson authored Jul 1, 2019
2 parents c85a0f8 + 08e24fa commit 90a529e
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 0 deletions.
30 changes: 30 additions & 0 deletions cmd/influxd/inspect/export_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package inspect

import (
"os"

"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/spf13/cobra"
)

func NewExportBlocksCommand() *cobra.Command {
return &cobra.Command{
Use: `export-blocks`,
Short: "Exports block data",
Long: `
This command will export all blocks in one or more TSM1 files to
another format for easier inspection and debugging.`,
RunE: func(cmd *cobra.Command, args []string) error {
e := tsm1.NewSQLBlockExporter(os.Stdout)
for _, arg := range args {
if err := e.ExportFile(arg); err != nil {
return err
}
}
if err := e.Close(); err != nil {
return err
}
return nil
},
}
}
1 change: 1 addition & 0 deletions cmd/influxd/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func NewCommand() *cobra.Command {
// List of available sub-commands
// If a new sub-command is created, it must be added here
subCommands := []*cobra.Command{
NewExportBlocksCommand(),
NewReportTSMCommand(),
NewVerifyWALCommand(),
}
Expand Down
5 changes: 5 additions & 0 deletions tsdb/explode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ func DecodeName(name [16]byte) (org, bucket platform.ID) {
return
}

// DecodeNameSlice converts tsdb internal serialization back to organization and bucket IDs.
func DecodeNameSlice(name []byte) (org, bucket platform.ID) {
return platform.ID(binary.BigEndian.Uint64(name[0:8])), platform.ID(binary.BigEndian.Uint64(name[8:16]))
}

// EncodeName converts org/bucket pairs to the tsdb internal serialization
func EncodeName(org, bucket platform.ID) [16]byte {
var nameBytes [16]byte
Expand Down
173 changes: 173 additions & 0 deletions tsdb/tsm1/block_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package tsm1

import (
"errors"
"fmt"
"io"
"os"
"strings"
"unicode/utf8"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)

// BlockExporter writes all blocks in a file to a given format.
type BlockExporter interface {
io.Closer
ExportFile(filename string) error
}

// Ensure type implements interface.
var _ BlockExporter = (*SQLBlockExporter)(nil)

// SQLBlockExporter writes out all blocks for TSM files to SQL.
type SQLBlockExporter struct {
w io.Writer
initialized bool // true when initial block written

// Write schema, if true.
ShowSchema bool
}

// NewSQLBlockExporter returns a new instance of SQLBlockExporter.
func NewSQLBlockExporter(w io.Writer) *SQLBlockExporter {
return &SQLBlockExporter{
w: w,

ShowSchema: true,
}
}

// Close ends the export and writes final output.
func (e *SQLBlockExporter) Close() error {
return nil
}

// ExportFile writes all blocks of the TSM file.
func (e *SQLBlockExporter) ExportFile(filename string) error {
if !e.initialized {
if err := e.initialize(); err != nil {
return err
}
}

f, err := os.OpenFile(filename, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()

r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()

itr := r.BlockIterator()
if itr == nil {
return errors.New("invalid TSM file, no block iterator")
}

fmt.Fprintln(e.w, `BEGIN TRANSACTION;`)
for itr.Next() {
key, minTime, maxTime, typ, checksum, buf, err := itr.Read()
if err != nil {
return err
}

// Extract organization & bucket ID.
var record blockExportRecord
record.Filename = filename
if len(key) < 16 {
record.Key = string(key)
} else {
record.OrgID, record.BucketID = tsdb.DecodeNameSlice(key[:16])
record.Key = string(key[16:])
}
record.Type = typ
record.MinTime = minTime
record.MaxTime = maxTime
record.Checksum = checksum
record.Count = BlockCount(buf)

if err := e.write(&record); err != nil {
return err
}
}
fmt.Fprintln(e.w, "COMMIT;")

if err := r.Close(); err != nil {
return fmt.Errorf("tsm1.SQLBlockExporter: cannot close reader: %s", err)
}

return nil
}

func (e *SQLBlockExporter) initialize() error {
if e.ShowSchema {
fmt.Fprintln(e.w, `
CREATE TABLE IF NOT EXISTS blocks (
filename TEXT NOT NULL,
org_id INTEGER NOT NULL,
bucket_id INTEGER NOT NULL,
key TEXT NOT NULL,
"type" TEXT NOT NULL,
min_time INTEGER NOT NULL,
max_time INTEGER NOT NULL,
checksum INTEGER NOT NULL,
count INTEGER NOT NULL
);
CREATE INDEX idx_blocks_filename ON blocks (filename);
CREATE INDEX idx_blocks_org_id_bucket_id_key ON blocks (org_id, bucket_id, key);
`[1:])
}

e.initialized = true

return nil
}

func (e *SQLBlockExporter) write(record *blockExportRecord) error {
_, err := fmt.Fprintf(e.w,
"INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES (%s, %d, %d, %s, %s, %d, %d, %d, %d);\n",
quoteSQL(record.Filename),
record.OrgID,
record.BucketID,
quoteSQL(record.Key),
quoteSQL(BlockTypeName(record.Type)),
record.MinTime,
record.MaxTime,
record.Checksum,
record.Count,
)
return err
}

type blockExportRecord struct {
Filename string
OrgID influxdb.ID
BucketID influxdb.ID
Key string
Type byte
MinTime int64
MaxTime int64
Checksum uint32
Count int
}

func quoteSQL(s string) string {
return `'` + sqlReplacer.Replace(toValidUTF8(s)) + `'`
}

var sqlReplacer = strings.NewReplacer(`'`, `''`, "\x00", "")

func toValidUTF8(s string) string {
return strings.Map(func(r rune) rune {
if r == utf8.RuneError {
return -1
}
return r
}, s)
}
47 changes: 47 additions & 0 deletions tsdb/tsm1/block_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tsm1

import (
"bytes"
"fmt"
"os"
"testing"
)

func TestSQLBlockExporter_Export(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)

// Write data.
if w, err := NewTSMWriter(f); err != nil {
t.Fatal(err)
} else if err := w.Write([]byte("cpu"), []Value{NewValue(0, int64(1))}); err != nil {
t.Fatal(err)
} else if err := w.Write([]byte("mem"), []Value{NewValue(0, int64(2))}); err != nil {
t.Fatal(err)
} else if err := w.WriteIndex(); err != nil {
t.Fatal(err)
} else if err := w.Close(); err != nil {
t.Fatal(err)
}

// Expected output.
want := fmt.Sprintf(`
BEGIN TRANSACTION;
INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES ('%s', 0, 0, 'cpu', 'integer', 0, 0, 3294968665, 1);
INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES ('%s', 0, 0, 'mem', 'integer', 0, 0, 755408492, 1);
COMMIT;
`[1:], f.Name(), f.Name())

// Export file to SQL.
var buf bytes.Buffer
e := NewSQLBlockExporter(&buf)
e.ShowSchema = false
if err := e.ExportFile(f.Name()); err != nil {
t.Fatal(err)
} else if err := e.Close(); err != nil {
t.Fatal(err)
} else if got := buf.String(); got != want {
t.Fatalf("unexpected output:\ngot=%s\n--\nwant=%s", got, want)
}
}
18 changes: 18 additions & 0 deletions tsdb/tsm1/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,21 @@ func getBooleanEncoder(sz int) BooleanEncoder {
return x
}
func putBooleanEncoder(enc BooleanEncoder) { booleanEncoderPool.Put(enc) }

// BlockTypeName returns a string name for the block type.
func BlockTypeName(typ byte) string {
switch typ {
case BlockFloat64:
return "float64"
case BlockInteger:
return "integer"
case BlockBoolean:
return "boolean"
case BlockString:
return "string"
case BlockUnsigned:
return "unsigned"
default:
return fmt.Sprintf("unknown(%d)", typ)
}
}

0 comments on commit 90a529e

Please sign in to comment.