-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathblock_exporter.go
173 lines (146 loc) · 3.57 KB
/
block_exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package tsm1
import (
"errors"
"fmt"
"io"
"os"
"strings"
"unicode/utf8"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)
// BlockExporter writes all blocks in a file to a given format.
type BlockExporter interface {
io.Closer
ExportFile(filename string) error
}
// Ensure type implements interface.
var _ BlockExporter = (*SQLBlockExporter)(nil)
// SQLBlockExporter writes out all blocks for TSM files to SQL.
type SQLBlockExporter struct {
w io.Writer
initialized bool // true when initial block written
// Write schema, if true.
ShowSchema bool
}
// NewSQLBlockExporter returns a new instance of SQLBlockExporter.
func NewSQLBlockExporter(w io.Writer) *SQLBlockExporter {
return &SQLBlockExporter{
w: w,
ShowSchema: true,
}
}
// Close ends the export and writes final output.
func (e *SQLBlockExporter) Close() error {
return nil
}
// ExportFile writes all blocks of the TSM file.
func (e *SQLBlockExporter) ExportFile(filename string) error {
if !e.initialized {
if err := e.initialize(); err != nil {
return err
}
}
f, err := os.OpenFile(filename, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()
r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()
itr := r.BlockIterator()
if itr == nil {
return errors.New("invalid TSM file, no block iterator")
}
fmt.Fprintln(e.w, `BEGIN TRANSACTION;`)
for itr.Next() {
key, minTime, maxTime, typ, checksum, buf, err := itr.Read()
if err != nil {
return err
}
// Extract organization & bucket ID.
var record blockExportRecord
record.Filename = filename
if len(key) < 16 {
record.Key = string(key)
} else {
record.OrgID, record.BucketID = tsdb.DecodeNameSlice(key[:16])
record.Key = string(key[16:])
}
record.Type = typ
record.MinTime = minTime
record.MaxTime = maxTime
record.Checksum = checksum
record.Count = BlockCount(buf)
if err := e.write(&record); err != nil {
return err
}
}
fmt.Fprintln(e.w, "COMMIT;")
if err := r.Close(); err != nil {
return fmt.Errorf("tsm1.SQLBlockExporter: cannot close reader: %s", err)
}
return nil
}
func (e *SQLBlockExporter) initialize() error {
if e.ShowSchema {
fmt.Fprintln(e.w, `
CREATE TABLE IF NOT EXISTS blocks (
filename TEXT NOT NULL,
org_id INTEGER NOT NULL,
bucket_id INTEGER NOT NULL,
key TEXT NOT NULL,
"type" TEXT NOT NULL,
min_time INTEGER NOT NULL,
max_time INTEGER NOT NULL,
checksum INTEGER NOT NULL,
count INTEGER NOT NULL
);
CREATE INDEX idx_blocks_filename ON blocks (filename);
CREATE INDEX idx_blocks_org_id_bucket_id_key ON blocks (org_id, bucket_id, key);
`[1:])
}
e.initialized = true
return nil
}
func (e *SQLBlockExporter) write(record *blockExportRecord) error {
_, err := fmt.Fprintf(e.w,
"INSERT INTO blocks (filename, org_id, bucket_id, key, type, min_time, max_time, checksum, count) VALUES (%s, %d, %d, %s, %s, %d, %d, %d, %d);\n",
quoteSQL(record.Filename),
record.OrgID,
record.BucketID,
quoteSQL(record.Key),
quoteSQL(BlockTypeName(record.Type)),
record.MinTime,
record.MaxTime,
record.Checksum,
record.Count,
)
return err
}
type blockExportRecord struct {
Filename string
OrgID influxdb.ID
BucketID influxdb.ID
Key string
Type byte
MinTime int64
MaxTime int64
Checksum uint32
Count int
}
func quoteSQL(s string) string {
return `'` + sqlReplacer.Replace(toValidUTF8(s)) + `'`
}
var sqlReplacer = strings.NewReplacer(`'`, `''`, "\x00", "")
func toValidUTF8(s string) string {
return strings.Map(func(r rune) rune {
if r == utf8.RuneError {
return -1
}
return r
}, s)
}