-
Notifications
You must be signed in to change notification settings - Fork 9
/
tx_analyzer.go
318 lines (260 loc) · 9.11 KB
/
tx_analyzer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package main
import (
"flag"
"io/ioutil"
"log"
"os"
"sync"
"time"
)
var SEND_EMAIL bool
var N_WORKERS int
var BACKUP_JSON bool
var JSON_DIR string
var WORKER_PROGRESS_DIR string
var MIN_DIST_FROM_TIP int64
const SHOW_QUERIES = false
const JSON_DIR_RELATIVE = "/db-backup"
const WORKER_PROGRESS_DIR_RELATIVE = "/worker-progress"
const N_WORKERS_DEFAULT = 2
const DB_WAIT_TIME = 30
const MAX_ATTEMPTS = 3 // max number of DB write attempts before giving up
const DEFAULT_DIST_FROM_TIP = 6
const CURRENT_VERSION_NUMBER = 2
func main() {
sendEmailPtr := flag.Bool("email", false, "Set to true to send email upon failure. \n(Need to set additional environment variables, RECIPIENT_EMAILS should be a comma-separated list of recipient emails, \n EMAIL_ADDR, EMAIL_PASSWORD for sending address must also be set)")
tipDistPtr := flag.Int64("tipdist", DEFAULT_DIST_FROM_TIP, "Number of blocks behind tip (during live analysis).")
nWorkersPtr := flag.Int("workers", N_WORKERS_DEFAULT, "Number of concurrent workers.")
startPtr := flag.Int("start", 0, "Starting blockheight.")
endPtr := flag.Int("end", -1, "Last blockheight to analyze.")
// Flags for different modes of operation. Default is to live analysis/back-filling.
mempoolPtr := flag.Bool("mempool", false, "Set to true to start a mempool analysis")
insertPtr := flag.Bool("insert-json", false, "Set to true to insert .json data files into PostgreSQL")
recoveryFlagPtr := flag.Bool("recovery", false, "Set to true to start workers on files in ./worker-progress")
jsonPtr := flag.Bool("json", true, "Set to false to stop json logging in /db-backup")
flag.Parse()
// Set global variables
N_WORKERS = *nWorkersPtr
BACKUP_JSON = *jsonPtr
MIN_DIST_FROM_TIP = *tipDistPtr
SEND_EMAIL = *sendEmailPtr
currentDir, err := os.Getwd()
if err != nil {
log.Fatal("Error getting working directory: ", err)
}
// Create directory for json files.
if BACKUP_JSON {
JSON_DIR = currentDir + JSON_DIR_RELATIVE
createDirIfNotExist(JSON_DIR)
}
// Create worker progress directory if it doesn't exist yet.
WORKER_PROGRESS_DIR = currentDir + WORKER_PROGRESS_DIR_RELATIVE
if _, err := os.Stat(WORKER_PROGRESS_DIR); os.IsNotExist(err) {
createDirIfNotExist(WORKER_PROGRESS_DIR)
}
if SEND_EMAIL {
_, ok1 := os.LookupEnv("RECIPIENT_EMAILS")
_, ok2 := os.LookupEnv("EMAIL_ADDR")
_, ok3 := os.LookupEnv("EMAIL_PASSWORD")
if !(ok1 && ok2 && ok3) {
log.Fatal("-email option requires environment variables: RECIPIENT_EMAIL, EMAIL_ADDR, EMAIL_PASSWORD to be set!")
}
}
if *mempoolPtr {
liveMempoolAnalysis()
return
}
if *insertPtr {
toPostgres()
return
}
if *recoveryFlagPtr {
recoverFromFailure()
}
// If an end value is given, analyze that range.
// ( the start value defaults to 0)
if *endPtr > 0 {
startBackfill(*startPtr, *endPtr)
return
}
// Given no arguments, start live analysis.
doLiveAnalysis(*startPtr)
}
// Splits up work across N_WORKERS workers,each with their own RPC/db clients.
func startBackfill(start, end int) {
var wg sync.WaitGroup
workSplit := (end - start) / N_WORKERS
log.Println("Work split: ", workSplit, end-start, start, end)
formattedTime := time.Now().Format("01-02:15:04")
for i := 0; i < N_WORKERS; i++ {
wg.Add(1)
go func(i int) {
analyzeBlockRange(formattedTime, i, start+(workSplit*i), start+(workSplit*(i+1)))
wg.Done()
}(i)
}
wg.Wait()
}
// Analyzes all blocks from in the interval [start, end)
func analyzeBlockRange(formattedTime string, workerID, start, end int) {
worker := setupWorker(formattedTime, workerID)
defer worker.shutdown()
// Keep track of time since last write.
// If it was less than DB_WAIT_TIME seconds ago. don't write yet.
// prevents us from overwhelming the database.
lastWriteTime := time.Now()
lastWriteTime = lastWriteTime.Add(DB_WAIT_TIME * time.Second)
startTime := time.Now()
// Record progress in file.
logProgressToFile(start, start, end, worker.workFile)
for i := start; i < end; i++ {
startBlock := time.Now()
worker.analyzeBlock(int64(i))
log.Printf("Worker %v: Done with %v blocks total (height=%v) after %v (%v) \n", workerID, i-start+1, i, time.Since(startTime), time.Since(startBlock))
// Only perform the write to database if there hasn't been a write in the last 5 seconds.
// And make sure to do the write before finishing.
if !time.Now().After(lastWriteTime) && (i != end-1) {
continue
}
// Write to database.
ok := worker.commitBatchInsert()
if !ok {
log.Println("DB write failed!", workerID)
return
}
lastWriteTime = time.Now().Add(DB_WAIT_TIME * time.Second)
// Record progress in file, overwriting previous record.
logProgressToFile(start, i+1, end, worker.workFile)
}
log.Printf("Worker %v done analyzing %v blocks (height=%v) after %v\n", workerID, end-start, end, time.Since(startTime))
}
// analyzeBlock uses the getblockstats RPC to compute metrics of a single block.
// It then stores the results in a batch to be inserted to db later.
func (worker *Worker) analyzeBlock(blockHeight int64) {
// Use getblockstats RPC and merge results into the metrics struct.
blockStatsRes, err := worker.client.GetBlockStats(blockHeight, nil)
if err != nil {
fatal("Error with getblockstats RPC: ", err)
}
blockStats := BlockStats{blockStatsRes}
worker.batchInsert(blockStats)
}
// recoverFromFailure checks the worker-progress directory for any unfinished work from a previous job.
// If there is any, it starts a new worker to continue the work for each previously failed worker.
func recoverFromFailure() {
log.Println("Starting Recovery Process.")
files, err := ioutil.ReadDir(WORKER_PROGRESS_DIR)
if err != nil {
fatal("Error reading worker_progress directory: ", err)
}
var wg sync.WaitGroup
wg.Add(len(files))
workers := make(chan struct{}, N_WORKERS)
for i := 0; i < N_WORKERS; i++ { // Signal that there are available workers.
workers <- struct{}{}
}
progressFiles := make([]os.FileInfo, len(files))
for i := 0; i < len(files); i++ {
progressFiles[i] = files[i]
}
i := 0 // index into files, incremented at bottom of loop.
for i < len(files) {
// Check if any workers are free.
select {
case <-workers:
default:
// If all workers are busy, wait and continue.
time.Sleep(1000 * time.Millisecond)
continue
}
file := progressFiles[i]
contentsBytes, err := ioutil.ReadFile(WORKER_PROGRESS_DIR + "/" + file.Name())
if err != nil {
fatal("Error reading wp file: ", err)
}
contents := string(contentsBytes)
progress := parseProgress(contents)
log.Printf("Starting recovery worker %v on range [%v, %v) at height %v\n", i, progress[0], progress[2], progress[1])
go func(i int) {
analyzeBlockRange(time.Now().Format("01-02:15:04"), i, progress[1], progress[2])
workers <- struct{}{}
wg.Done()
}(i)
err = os.Remove(WORKER_PROGRESS_DIR + "/" + file.Name())
if err != nil {
fatal("Error removing file: ", err)
}
i++
}
wg.Wait()
log.Println("Finished with Recovery.")
}
// doLiveAnalysis does an analysis of blocks as they come in live.
// In order to avoid dealing with re-org in this code-base, it should
// stay at least 6 blocks behind.
func doLiveAnalysis(height int) {
log.Println("Starting a live analysis of the blockchain.")
worker := setupWorker(time.Now().Format("01-02:15:04"), height)
defer worker.shutdown()
blockCount, err := worker.client.GetBlockCount()
if err != nil {
fatal("Error with getblockcount RPC: ", err)
}
var lastAnalysisStarted int64
if height == 0 {
lastAnalysisStarted = blockCount - MIN_DIST_FROM_TIP
} else {
lastAnalysisStarted = int64(height)
}
workers := make(chan struct{}, N_WORKERS)
for i := 0; i < N_WORKERS; i++ {
workers <- struct{}{}
}
heightInRangeOfTip := (blockCount - lastAnalysisStarted) <= MIN_DIST_FROM_TIP
for {
// Check if any workers are free.
select {
case <-workers:
default:
time.Sleep(500 * time.Millisecond)
continue
}
if heightInRangeOfTip {
time.Sleep(500 * time.Millisecond)
blockCount, err = worker.client.GetBlockCount()
if err != nil {
fatal("Error with getblockcount RPC: ", err)
}
workers <- struct{}{}
} else {
go func(blockHeight int64) {
analyzeBlockLive(blockHeight)
workers <- struct{}{}
}(lastAnalysisStarted)
lastAnalysisStarted += 1
}
heightInRangeOfTip = (blockCount - lastAnalysisStarted) <= MIN_DIST_FROM_TIP
}
}
// analyzeBlock uses the getblockstats RPC to compute metrics of a single block.
// It then stores the results in a database (and json file if desired).
func analyzeBlockLive(blockHeight int64) {
worker := setupWorker(time.Now().Format("01-02:15:04"), int(blockHeight))
defer worker.shutdown()
start := time.Now()
// Record progress in file.
logProgressToFile(int(blockHeight), int(blockHeight), int(blockHeight), worker.workFile)
// Use getblockstats RPC and merge results into the metrics struct.
blockStatsRes, err := worker.client.GetBlockStats(blockHeight, nil)
if err != nil {
fatal("Error with getblockstats RPC: ", err)
}
blockStats := BlockStats{blockStatsRes}
// Insert into database.
ok := worker.insert(blockStats)
if !ok {
log.Printf("DB write failed!")
return
}
log.Printf("Done with block %v after %v\n", blockHeight, time.Since(start))
}