Skip to content

Commit 752dcfc

Browse files
committed
Refactor store
Add store interface and implement with offChainStore struct. Decompose storing into small chunks and keep state around storing writes and failure count. Move environment variables used for store setup into the setup phase of the listen function. Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
1 parent 4fa17db commit 752dcfc

File tree

2 files changed

+100
-63
lines changed

2 files changed

+100
-63
lines changed

off_chain_data/application-go/listen.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"os/signal"
99
"slices"
10+
"strconv"
1011
"strings"
1112
"syscall"
1213

@@ -35,12 +36,15 @@ func listen(clientConnection grpc.ClientConnInterface) error {
3536
checkpointer.Close()
3637
fmt.Println("Checkpointer closed.")
3738
}()
38-
3939
fmt.Println("Start event listening from block", checkpointer.BlockNumber())
4040
fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID())
41+
42+
simulatedFailureCount := initSimulatedFailureCount()
4143
if simulatedFailureCount > 0 {
4244
fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount)
4345
}
46+
storeFile := envOrDefault("STORE_FILE", "store.log")
47+
offChainStore := newOffChainStore(storeFile, simulatedFailureCount)
4448

4549
ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
4650
defer func() {
@@ -65,7 +69,7 @@ func listen(clientConnection grpc.ClientConnInterface) error {
6569
aBlockProcessor := blockProcessor{
6670
parser.ParseBlock(blockProto),
6771
checkpointer,
68-
applyWritesToOffChainStore,
72+
offChainStore,
6973
}
7074

7175
if err := aBlockProcessor.process(); err != nil {
@@ -77,10 +81,46 @@ func listen(clientConnection grpc.ClientConnInterface) error {
7781
return nil
7882
}
7983

84+
func initSimulatedFailureCount() uint {
85+
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
86+
result, err := strconv.ParseUint(valueAsString, 10, 0)
87+
if err != nil {
88+
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
89+
}
90+
91+
return uint(result)
92+
}
93+
94+
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
95+
type store interface {
96+
write(ledgerUpdate) error
97+
}
98+
99+
// Ledger update made by a specific transaction.
100+
type ledgerUpdate struct {
101+
BlockNumber uint64
102+
TransactionID string
103+
Writes []write
104+
}
105+
106+
// Description of a ledger Write that can be applied to an off-chain data store.
107+
type write struct {
108+
// Channel whose ledger is being updated.
109+
ChannelName string `json:"channelName"`
110+
// Namespace within the ledger.
111+
Namespace string `json:"namespace"`
112+
// Key name within the ledger namespace.
113+
Key string `json:"key"`
114+
// Whether the key and associated value are being deleted.
115+
IsDelete bool `json:"isDelete"`
116+
// If `isDelete` is false, the Value written to the key; otherwise ignored.
117+
Value string `json:"value"`
118+
}
119+
80120
type blockProcessor struct {
81121
parsedBlock *parser.Block
82122
checkpointer *client.FileCheckpointer
83-
writeToStore writer
123+
store store
84124
}
85125

86126
func (b *blockProcessor) process() error {
@@ -95,7 +135,7 @@ func (b *blockProcessor) process() error {
95135
txProcessor := transactionProcessor{
96136
b.parsedBlock.Number(),
97137
validTransaction,
98-
b.writeToStore,
138+
b.store,
99139
}
100140
if err := txProcessor.process(); err != nil {
101141
return err
@@ -182,9 +222,9 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) {
182222
}
183223

184224
type transactionProcessor struct {
185-
blockNumber uint64
186-
transaction *parser.Transaction
187-
writeToStore writer
225+
blockNumber uint64
226+
transaction *parser.Transaction
227+
store store
188228
}
189229

190230
func (t *transactionProcessor) process() error {
@@ -201,8 +241,7 @@ func (t *transactionProcessor) process() error {
201241
}
202242

203243
fmt.Println("Process transaction", transactionID)
204-
205-
if err := t.writeToStore(ledgerUpdate{
244+
if err := t.store.write(ledgerUpdate{
206245
BlockNumber: t.blockNumber,
207246
TransactionID: transactionID,
208247
Writes: writes,

off_chain_data/application-go/store.go

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,91 +5,89 @@ import (
55
"errors"
66
"fmt"
77
"os"
8-
"strconv"
9-
"strings"
108
)
119

12-
var storeFile = envOrDefault("STORE_FILE", "store.log")
13-
var simulatedFailureCount = getSimulatedFailureCount()
14-
var transactionCount uint = 0 // Used only to simulate failures
15-
16-
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
17-
type writer = func(ledgerUpdate) error
10+
var errExpected = errors.New("expected error: simulated write failure")
1811

19-
// Ledger update made by a specific transaction.
20-
type ledgerUpdate struct {
21-
BlockNumber uint64
22-
TransactionID string
23-
Writes []write
12+
type offChainStore struct {
13+
writes, path string
14+
simulatedFailureCount, transactionCount uint
2415
}
2516

26-
// Description of a ledger Write that can be applied to an off-chain data store.
27-
type write struct {
28-
// Channel whose ledger is being updated.
29-
ChannelName string `json:"channelName"`
30-
// Namespace within the ledger.
31-
Namespace string `json:"namespace"`
32-
// Key name within the ledger namespace.
33-
Key string `json:"key"`
34-
// Whether the key and associated value are being deleted.
35-
IsDelete bool `json:"isDelete"`
36-
// If `isDelete` is false, the Value written to the key; otherwise ignored.
37-
Value string `json:"value"`
17+
func newOffChainStore(path string, simulatedFailureCount uint) *offChainStore {
18+
return &offChainStore{
19+
"",
20+
path,
21+
uint(simulatedFailureCount),
22+
0,
23+
}
3824
}
3925

4026
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
4127
// This implementation just writes to a file.
42-
func applyWritesToOffChainStore(data ledgerUpdate) error {
43-
if err := simulateFailureIfRequired(); err != nil {
28+
func (ocs *offChainStore) write(data ledgerUpdate) error {
29+
if err := ocs.simulateFailureIfRequired(); err != nil {
4430
return err
4531
}
4632

47-
writes := []string{}
48-
for _, write := range data.Writes {
49-
marshaled, err := json.Marshal(write)
50-
if err != nil {
51-
return err
52-
}
53-
54-
writes = append(writes, string(marshaled))
55-
}
33+
ocs.clearLastWrites()
5634

57-
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
58-
if err != nil {
35+
if err := ocs.marshal(data.Writes); err != nil {
5936
return err
6037
}
6138

62-
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
63-
f.Close()
39+
if err := ocs.persist(); err != nil {
6440
return err
6541
}
6642

67-
if err := f.Close(); err != nil {
68-
return err
43+
return nil
44+
}
45+
46+
func (ocs *offChainStore) simulateFailureIfRequired() error {
47+
if ocs.simulatedFailureCount > 0 && ocs.transactionCount >= ocs.simulatedFailureCount {
48+
ocs.transactionCount = 0
49+
return errExpected
6950
}
7051

52+
ocs.transactionCount += 1
53+
7154
return nil
7255
}
7356

74-
var errExpected = errors.New("expected error: simulated write failure")
57+
func (ocs *offChainStore) clearLastWrites() {
58+
ocs.writes = ""
59+
}
7560

76-
func simulateFailureIfRequired() error {
77-
if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount {
78-
transactionCount = 0
79-
return errExpected
80-
}
61+
func (ocs *offChainStore) marshal(writes []write) error {
62+
for _, write := range writes {
63+
marshaled, err := json.Marshal(write)
64+
if err != nil {
65+
return err
66+
}
8167

82-
transactionCount += 1
68+
ocs.writes += string(marshaled) + "\n"
69+
}
8370

8471
return nil
8572
}
8673

87-
func getSimulatedFailureCount() uint {
88-
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
89-
result, err := strconv.ParseUint(valueAsString, 10, 0)
74+
func (ocs *offChainStore) persist() error {
75+
f, err := os.OpenFile(ocs.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
9076
if err != nil {
91-
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
77+
return err
9278
}
9379

94-
return uint(result)
80+
if _, writeErr := f.Write([]byte(ocs.writes)); writeErr != nil {
81+
if closeErr := f.Close(); closeErr != nil {
82+
return fmt.Errorf("write error: %v, close error: %v", writeErr, closeErr)
83+
}
84+
85+
return writeErr
86+
}
87+
88+
if err := f.Close(); err != nil {
89+
return err
90+
}
91+
92+
return nil
9593
}

0 commit comments

Comments
 (0)