-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathapi.go
More file actions
143 lines (115 loc) · 4.19 KB
/
api.go
File metadata and controls
143 lines (115 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Package storage defines storage interfaces.
package storage
import (
"context"
"strings"
"sync"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/oasisprotocol/nexus/common"
)
type BatchItem struct {
Cmd string
Args []interface{}
}
// QueryBatch represents a batch of queries to be executed atomically.
// We use a custom type that mirrors `pgx.Batch`, but is thread-safe to use and
// allows introspection for debugging.
type QueryBatch struct {
items []*BatchItem
mu sync.Mutex
}
// QueryResults represents the results from a read query.
type QueryResults = pgx.Rows
// QueryResult represents the result from a read query.
type QueryResult = pgx.Row
// TxOptions encodes the way DB transactions are executed.
type TxOptions = pgx.TxOptions
// Tx represents a database transaction.
type Tx = pgx.Tx
// Queue adds query to a batch.
func (b *QueryBatch) Queue(cmd string, args ...interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
b.items = append(b.items, &BatchItem{
Cmd: cmd,
Args: args,
})
}
// Extend merges another batch into the current batch.
func (b *QueryBatch) Extend(qb *QueryBatch) {
b.mu.Lock()
defer b.mu.Unlock()
if qb != b {
qb.mu.Lock()
defer qb.mu.Unlock()
}
b.items = append(b.items, qb.items...)
}
// Len returns the number of queries in the batch.
func (b *QueryBatch) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.items)
}
// AsPgxBatch converts a QueryBatch to a pgx.Batch.
func (b *QueryBatch) AsPgxBatch() pgx.Batch {
b.mu.Lock()
defer b.mu.Unlock()
pgxBatch := pgx.Batch{}
for _, item := range b.items {
pgxBatch.Queue(item.Cmd, item.Args...)
}
return pgxBatch
}
// Queries returns the queries in the batch. Each item of the returned slice
// is composed of the SQL command and its arguments.
func (b *QueryBatch) Queries() []*BatchItem {
b.mu.Lock()
defer b.mu.Unlock()
return b.items
}
// TargetStorage defines an interface for reading and writing
// processed block data.
type TargetStorage interface {
// SendBatch sends a batch of queries to be applied to target storage.
SendBatch(ctx context.Context, batch *QueryBatch) error
// SendBatchWithOptions is like SendBatch, with custom DB options (e.g. level of tx isolation).
SendBatchWithOptions(ctx context.Context, batch *QueryBatch, opts TxOptions) error
// Query submits a query to fetch data from target storage.
Query(ctx context.Context, sql string, args ...interface{}) (QueryResults, error)
// QueryRow submits a query to fetch a single row of data from target storage.
QueryRow(ctx context.Context, sql string, args ...interface{}) QueryResult
// Exec executes a query without returning any rows.
Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
// Begin starts a new transaction.
// XXX: Not the nicest that this exposes the underlying pgx.Tx interface. Could instead
// return a `TargetStorage`-like interface wrapper, that only exposes Query/QueryRow/SendBatch/SendBatchWithOptions
// and Commit/Rollback.
Begin(ctx context.Context) (Tx, error)
// Close shuts down the target storage client.
Close()
// Name returns the name of the target storage.
Name() string
// Wipe removes all contents of the target storage.
Wipe(ctx context.Context) error
// DisableTriggersAndFKConstraints disables all triggers and foreign key constraints
// in nexus tables. This is useful when inserting blockchain data out of order,
// so that later blocks can refer to (yet unindexed) earlier blocks without violating constraints.
DisableTriggersAndFKConstraints(ctx context.Context) error
// EnableTriggersAndFKConstraints enables all triggers and foreign key constraints
// in the given schema.
// WARNING: This might enable triggers not explicitly disabled by DisableTriggersAndFKConstraints.
// WARNING: This does not enforce/check contraints on rows that were inserted while triggers were disabled.
EnableTriggersAndFKConstraints(ctx context.Context) error
}
// Postgres requires valid UTF-8 with no 0x00.
func SanitizeString(msg string) string {
return strings.ToValidUTF8(strings.ReplaceAll(msg, "\x00", "?"), "?")
}
func SanitizeStringP(msg *string) *string {
if msg == nil {
return nil
}
return common.Ptr(SanitizeString(*msg))
}