Skip to content

Commit 37f6187

Browse files
authored
Merge pull request #4 from perun-network/cache-first-update
🔀 Cache first channel update
2 parents bcc8c7a + 95631a3 commit 37f6187

File tree

7 files changed

+99
-23
lines changed

7 files changed

+99
-23
lines changed

client/client.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ import (
3434
//
3535
// Currently, only the two-party protocol is fully implemented.
3636
type Client struct {
37-
address wire.Address
38-
conn clientConn
39-
channels chanRegistry
40-
funder channel.Funder
41-
adjudicator channel.Adjudicator
42-
wallet wallet.Wallet
43-
pr persistence.PersistRestorer
44-
log log.Logger // structured logger for this client
37+
address wire.Address
38+
conn clientConn
39+
channels chanRegistry
40+
funder channel.Funder
41+
adjudicator channel.Adjudicator
42+
wallet wallet.Wallet
43+
pr persistence.PersistRestorer
44+
log log.Logger // structured logger for this client
45+
version1Cache version1Cache
4546

4647
sync.Closer
4748
}

client/client_role_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,21 @@ func NewSetups(rng *rand.Rand, names []string) []ctest.RoleSetup {
3838

3939
for i := 0; i < n; i++ {
4040
acc := wtest.NewRandomAccount(rng)
41+
42+
// The use of a delayed funder simulates that channel participants may
43+
// receive their funding confirmation at different times.
44+
var funder channel.Funder
45+
if i == 0 {
46+
funder = &logFunderWithDelay{log.WithField("role", names[i])}
47+
} else {
48+
funder = &logFunder{log.WithField("role", names[i])}
49+
}
50+
4151
setup[i] = ctest.RoleSetup{
4252
Name: names[i],
4353
Identity: acc,
4454
Bus: bus,
45-
Funder: &logFunder{log.WithField("role", names[i])},
55+
Funder: funder,
4656
Adjudicator: &logAdjudicator{log.WithField("role", names[i]), sync.RWMutex{}, nil},
4757
Wallet: wtest.NewWallet(),
4858
Timeout: roleOperationTimeout,
@@ -57,6 +67,10 @@ type (
5767
log log.Logger
5868
}
5969

70+
logFunderWithDelay struct {
71+
log log.Logger
72+
}
73+
6074
logAdjudicator struct {
6175
log log.Logger
6276
mu sync.RWMutex
@@ -69,6 +83,12 @@ func (f *logFunder) Fund(_ context.Context, req channel.FundingReq) error {
6983
return nil
7084
}
7185

86+
func (f *logFunderWithDelay) Fund(_ context.Context, req channel.FundingReq) error {
87+
time.Sleep(100 * time.Millisecond)
88+
f.log.Infof("Funding: %v", req)
89+
return nil
90+
}
91+
7292
func (a *logAdjudicator) Register(_ context.Context, req channel.AdjudicatorReq) error {
7393
a.log.Infof("Register: %v", req)
7494
e := channel.NewRegisteredEvent(

client/proposal.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21+
"sync"
2122

2223
"github.com/pkg/errors"
2324

@@ -140,13 +141,16 @@ func (c *Client) ProposeChannel(ctx context.Context, prop ChannelProposal) (*Cha
140141
}
141142

142143
// 2. send proposal, wait for response, create channel object
144+
c.enableVer1Cache() // cache version 1 updates until channel is opened
143145
ch, err := c.proposeTwoPartyChannel(ctx, prop)
144146
if err != nil {
145147
return nil, errors.WithMessage(err, "channel proposal")
146148
}
147149

148150
// 3. fund
149-
return ch, c.fundChannel(ctx, ch, prop)
151+
fundingErr := c.fundChannel(ctx, ch, prop)
152+
c.releaseVer1Cache() // replay cached version 1 updates
153+
return ch, fundingErr
150154
}
151155

152156
// handleChannelProposal implements the receiving side of the (currently)
@@ -175,11 +179,15 @@ func (c *Client) handleChannelProposalAcc(
175179
return ch, errors.WithMessage(err, "validating channel proposal acceptance")
176180
}
177181

182+
c.enableVer1Cache() // cache version 1 updates
183+
178184
if ch, err = c.acceptChannelProposal(ctx, prop, p, acc); err != nil {
179185
return ch, errors.WithMessage(err, "accept channel proposal")
180186
}
181187

182-
return ch, c.fundChannel(ctx, ch, prop)
188+
fundingErr := c.fundChannel(ctx, ch, prop)
189+
c.releaseVer1Cache() // replay cached version 1 updates
190+
return ch, fundingErr
183191
}
184192

185193
func (c *Client) acceptChannelProposal(
@@ -549,6 +557,40 @@ func enableVer0Cache(ctx context.Context, c wire.Cacher) {
549557
})
550558
}
551559

560+
func (c *Client) enableVer1Cache() {
561+
c.log.Trace("Enabling version 1 cache")
562+
563+
c.version1Cache.mu.Lock()
564+
defer c.version1Cache.mu.Unlock()
565+
566+
c.version1Cache.enabled++
567+
}
568+
569+
func (c *Client) releaseVer1Cache() {
570+
c.log.Trace("Releasing version 1 cache")
571+
572+
c.version1Cache.mu.Lock()
573+
defer c.version1Cache.mu.Unlock()
574+
575+
c.version1Cache.enabled--
576+
for _, u := range c.version1Cache.cache {
577+
go c.handleChannelUpdate(u.uh, u.p, u.m)
578+
}
579+
c.version1Cache.cache = nil
580+
}
581+
582+
type version1Cache struct {
583+
mu sync.Mutex
584+
enabled uint // counter to support concurrent channel openings
585+
cache []cachedUpdate
586+
}
587+
588+
type cachedUpdate struct {
589+
uh UpdateHandler
590+
p wire.Address
591+
m *msgChannelUpdate
592+
}
593+
552594
// Error implements the error interface.
553595
func (e PeerRejectedProposalError) Error() string {
554596
return fmt.Sprintf("channel proposal rejected: %s", e.reason)

client/test/alice.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Alice struct {
3535

3636
// NewAlice creates a new Proposer that executes the Alice protocol.
3737
func NewAlice(setup RoleSetup, t *testing.T) *Alice {
38-
return &Alice{Proposer: *NewProposer(setup, t, 4)}
38+
return &Alice{Proposer: *NewProposer(setup, t, 3)}
3939
}
4040

4141
// Execute executes the Alice protocol.
@@ -46,8 +46,6 @@ func (r *Alice) Execute(cfg ExecConfig) {
4646
func (r *Alice) exec(_cfg ExecConfig, ch *paymentChannel) {
4747
cfg := _cfg.(*AliceBobExecConfig)
4848
we, them := r.Idxs(cfg.Peers())
49-
// 1st stage - channel controller set up
50-
r.waitStage()
5149

5250
// 1st Alice receives some updates from Bob
5351
for i := 0; i < cfg.NumPayments[them]; i++ {

client/test/bob.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type Bob struct {
2626

2727
// NewBob creates a new Responder that executes the Bob protocol.
2828
func NewBob(setup RoleSetup, t *testing.T) *Bob {
29-
return &Bob{Responder: *NewResponder(setup, t, 4)}
29+
return &Bob{Responder: *NewResponder(setup, t, 3)}
3030
}
3131

3232
// Execute executes the Bob protocol.
@@ -38,9 +38,6 @@ func (r *Bob) exec(_cfg ExecConfig, ch *paymentChannel, propHandler *acceptNextP
3838
cfg := _cfg.(*AliceBobExecConfig)
3939
we, them := r.Idxs(cfg.Peers())
4040

41-
// 1st stage - channel controller set up
42-
r.waitStage()
43-
4441
// 1st Bob sends some updates to Alice
4542
for i := 0; i < cfg.NumPayments[we]; i++ {
4643
ch.sendTransfer(cfg.TxAmounts[we], fmt.Sprintf("Bob#%d", i))

client/test/proposer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ func (r *Proposer) Execute(cfg ExecConfig, exec func(ExecConfig, *paymentChannel
3636
rng := pkgtest.Prng(r.t, "proposer")
3737
assert := assert.New(r.t)
3838

39+
// ignore proposal handler since Proposer doesn't accept any incoming channels
40+
_, wait := r.GoHandle(rng)
41+
defer wait()
42+
3943
prop := r.LedgerChannelProposal(rng, cfg)
4044
ch, err := r.ProposeChannel(prop)
4145
assert.NoError(err)
@@ -45,10 +49,6 @@ func (r *Proposer) Execute(cfg ExecConfig, exec func(ExecConfig, *paymentChannel
4549
}
4650
r.log.Infof("New Channel opened: %v", ch.Channel)
4751

48-
// ignore proposal handler since Proposer doesn't accept any incoming channels
49-
_, wait := r.GoHandle(rng)
50-
defer wait()
51-
5252
exec(cfg, ch)
5353

5454
ch.Close() // May or may not already be closed due to channelConn closing.

client/update.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,31 @@ import (
3434
func (c *Client) handleChannelUpdate(uh UpdateHandler, p wire.Address, m *msgChannelUpdate) {
3535
ch, ok := c.channels.Get(m.ID())
3636
if !ok {
37-
c.logChan(m.ID()).WithField("peer", p).Error("received update for unknown channel")
37+
if !c.cacheVersion1Update(uh, p, m) {
38+
c.logChan(m.ID()).WithField("peer", p).Error("received update for unknown channel")
39+
}
3840
return
3941
}
4042
pidx := ch.Idx() ^ 1
4143
ch.handleUpdateReq(pidx, m, uh)
4244
}
4345

46+
func (c *Client) cacheVersion1Update(uh UpdateHandler, p wire.Address, m *msgChannelUpdate) bool {
47+
c.version1Cache.mu.Lock()
48+
defer c.version1Cache.mu.Unlock()
49+
50+
if !(m.State.Version == 1 && c.version1Cache.enabled > 0) {
51+
return false
52+
}
53+
54+
c.version1Cache.cache = append(c.version1Cache.cache, cachedUpdate{
55+
uh: uh,
56+
p: p,
57+
m: m,
58+
})
59+
return true
60+
}
61+
4462
type (
4563
// ChannelUpdate is a channel update proposal.
4664
ChannelUpdate struct {

0 commit comments

Comments
 (0)