Skip to content

Commit 43f67fb

Browse files
committed
Fix simulated failure issue
Before all transactions were processed and when the failure was simulated a message was printed and all the transactions still processed. Now the store returns an error when the failure is simulated which the listener expects so that it can gracefully shutdown the system and close the context. The context must be closed correctly or the checkpointer won't save the last processed transactionId to the file system. Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
1 parent f465eb8 commit 43f67fb

File tree

5 files changed

+31
-17
lines changed

5 files changed

+31
-17
lines changed

off_chain_data/application-go/listen.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,20 @@ func listen(clientConnection *grpc.ClientConn) {
4343
fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount)
4444
}
4545

46-
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
46+
ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
4747
defer func() {
48-
stop()
48+
close()
4949
fmt.Println("Context closed.")
5050
}()
5151

5252
network := gateway.GetNetwork(channelName)
5353
blocks, err := network.BlockEvents(
5454
ctx,
55+
// Used only if there is no checkpoint block number.
56+
// Order matters. WithStartBlock must be set before
57+
// WithCheckpoint to work.
58+
client.WithStartBlock(0),
5559
client.WithCheckpoint(checkpointer),
56-
client.WithStartBlock(0), // Used only if there is no checkpoint block number
5760
)
5861
if err != nil {
5962
panic(err)
@@ -76,11 +79,15 @@ func listen(clientConnection *grpc.ClientConn) {
7679
store.ApplyWritesToOffChainStore,
7780
channelName,
7881
)
79-
blockProcessor.Process()
82+
83+
if err := blockProcessor.Process(); err != nil && err.Error() == "[expected error]: simulated write failure" {
84+
fmt.Println(err.Error())
85+
return
86+
}
8087
}
8188
}
8289
}()
8390

8491
wg.Wait()
85-
fmt.Println("\nReceived 'SIGTERM' signal. Shutting down listener gracefully...")
92+
fmt.Println("\nShutting down listener gracefully...")
8693
}

off_chain_data/application-go/processor/block.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func NewBlock(
2929
}
3030
}
3131

32-
func (b *block) Process() {
32+
func (b *block) Process() error {
3333
blockNumber := b.parsedBlock.Number()
3434

3535
fmt.Println("\nReceived block", blockNumber)
@@ -42,13 +42,17 @@ func (b *block) Process() {
4242
b.writeToStore,
4343
b.channelName,
4444
}
45-
aTransaction.process()
45+
if err := aTransaction.process(); err != nil {
46+
return err
47+
}
4648

4749
transactionId := validTransaction.ChannelHeader().GetTxId()
4850
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
4951
}
5052

5153
b.checkpointer.CheckpointBlock(b.parsedBlock.Number())
54+
55+
return nil
5256
}
5357

5458
func (b *block) validTransactions() []*parser.Transaction {

off_chain_data/application-go/processor/transaction.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,25 @@ type transaction struct {
1414
channelName string
1515
}
1616

17-
func (t *transaction) process() {
17+
func (t *transaction) process() error {
1818
transactionId := t.transaction.ChannelHeader().GetTxId()
1919

2020
writes := t.writes()
2121
if len(writes) == 0 {
2222
fmt.Println("Skipping read-only or system transaction", transactionId)
23-
return
23+
return nil
2424
}
2525

2626
fmt.Println("Process transaction", transactionId)
2727

28-
t.writeToStore(store.LedgerUpdate{
28+
if err := t.writeToStore(store.LedgerUpdate{
2929
BlockNumber: t.blockNumber,
3030
TransactionId: transactionId,
3131
Writes: writes,
32-
})
32+
}); err != nil {
33+
return err
34+
}
35+
return nil
3336
}
3437

3538
func (t *transaction) writes() []store.Write {

off_chain_data/application-go/store/flatFille.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@ var transactionCount uint = 0 // Used only to simulate failures
1717

1818
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
1919
// This implementation just writes to a file.
20-
func ApplyWritesToOffChainStore(data LedgerUpdate) {
20+
func ApplyWritesToOffChainStore(data LedgerUpdate) error {
2121
if err := simulateFailureIfRequired(); err != nil {
22-
fmt.Println("[expected error]: " + err.Error())
23-
return
22+
return err
2423
}
2524

2625
writes := []string{}
2726
for _, write := range data.Writes {
28-
// TODO write also the TxID and block number so that you can compare easier to the output
2927
marshaled, err := json.Marshal(write)
3028
if err != nil {
3129
panic(err)
@@ -47,12 +45,14 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) {
4745
if err := f.Close(); err != nil {
4846
panic(err)
4947
}
48+
49+
return nil
5050
}
5151

5252
func simulateFailureIfRequired() error {
5353
if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount {
5454
transactionCount = 0
55-
return errors.New("simulated write failure")
55+
return errors.New("[expected error]: simulated write failure")
5656
}
5757

5858
transactionCount += 1

off_chain_data/application-go/store/model.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package store
22

33
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
4-
type Writer = func(data LedgerUpdate)
4+
type Writer = func(data LedgerUpdate) error
55

66
// Ledger update made by a specific transaction.
77
type LedgerUpdate struct {

0 commit comments

Comments
 (0)