Skip to content

Commit 65d51a3

Browse files
committed
🐛 Cache first channel update
1 parent 12c78d3 commit 65d51a3

File tree

3 files changed

+72
-11
lines changed

3 files changed

+72
-11
lines changed

client/client.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ import (
3434
//
3535
// Currently, only the two-party protocol is fully implemented.
3636
type Client struct {
37-
address wire.Address
38-
conn clientConn
39-
channels chanRegistry
40-
funder channel.Funder
41-
adjudicator channel.Adjudicator
42-
wallet wallet.Wallet
43-
pr persistence.PersistRestorer
44-
log log.Logger // structured logger for this client
37+
address wire.Address
38+
conn clientConn
39+
channels chanRegistry
40+
funder channel.Funder
41+
adjudicator channel.Adjudicator
42+
wallet wallet.Wallet
43+
pr persistence.PersistRestorer
44+
log log.Logger // structured logger for this client
45+
version1Cache version1Cache
4546

4647
sync.Closer
4748
}

client/proposal.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21+
"sync"
2122

2223
"github.com/pkg/errors"
2324

@@ -140,13 +141,16 @@ func (c *Client) ProposeChannel(ctx context.Context, prop ChannelProposal) (*Cha
140141
}
141142

142143
// 2. send proposal, wait for response, create channel object
144+
c.enableVer1Cache() // cache version 1 updates until channel is opened
143145
ch, err := c.proposeTwoPartyChannel(ctx, prop)
144146
if err != nil {
145147
return nil, errors.WithMessage(err, "channel proposal")
146148
}
147149

148150
// 3. fund
149-
return ch, c.fundChannel(ctx, ch, prop)
151+
fundingErr := c.fundChannel(ctx, ch, prop)
152+
c.releaseVer1Cache() // replay cached version 1 updates
153+
return ch, fundingErr
150154
}
151155

152156
// handleChannelProposal implements the receiving side of the (currently)
@@ -175,11 +179,15 @@ func (c *Client) handleChannelProposalAcc(
175179
return ch, errors.WithMessage(err, "validating channel proposal acceptance")
176180
}
177181

182+
c.enableVer1Cache() // cache version 1 updates
183+
178184
if ch, err = c.acceptChannelProposal(ctx, prop, p, acc); err != nil {
179185
return ch, errors.WithMessage(err, "accept channel proposal")
180186
}
181187

182-
return ch, c.fundChannel(ctx, ch, prop)
188+
fundingErr := c.fundChannel(ctx, ch, prop)
189+
c.releaseVer1Cache() // replay cached version 1 updates
190+
return ch, fundingErr
183191
}
184192

185193
func (c *Client) acceptChannelProposal(
@@ -549,6 +557,40 @@ func enableVer0Cache(ctx context.Context, c wire.Cacher) {
549557
})
550558
}
551559

560+
func (c *Client) enableVer1Cache() {
561+
c.log.Trace("Enabling version 1 cache")
562+
563+
c.version1Cache.mu.Lock()
564+
defer c.version1Cache.mu.Unlock()
565+
566+
c.version1Cache.enabled++
567+
}
568+
569+
func (c *Client) releaseVer1Cache() {
570+
c.log.Trace("Releasing version 1 cache")
571+
572+
c.version1Cache.mu.Lock()
573+
defer c.version1Cache.mu.Unlock()
574+
575+
c.version1Cache.enabled--
576+
for _, u := range c.version1Cache.cache {
577+
go c.handleChannelUpdate(u.uh, u.p, u.m)
578+
}
579+
c.version1Cache.cache = nil
580+
}
581+
582+
type version1Cache struct {
583+
mu sync.Mutex
584+
enabled uint // counter to support concurrent channel openings
585+
cache []cachedUpdate
586+
}
587+
588+
type cachedUpdate struct {
589+
uh UpdateHandler
590+
p wire.Address
591+
m *msgChannelUpdate
592+
}
593+
552594
// Error implements the error interface.
553595
func (e PeerRejectedProposalError) Error() string {
554596
return fmt.Sprintf("channel proposal rejected: %s", e.reason)

client/update.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,31 @@ import (
3434
func (c *Client) handleChannelUpdate(uh UpdateHandler, p wire.Address, m *msgChannelUpdate) {
3535
ch, ok := c.channels.Get(m.ID())
3636
if !ok {
37-
c.logChan(m.ID()).WithField("peer", p).Error("received update for unknown channel")
37+
if !c.cacheVersion1Update(uh, p, m) {
38+
c.logChan(m.ID()).WithField("peer", p).Error("received update for unknown channel")
39+
}
3840
return
3941
}
4042
pidx := ch.Idx() ^ 1
4143
ch.handleUpdateReq(pidx, m, uh)
4244
}
4345

46+
func (c *Client) cacheVersion1Update(uh UpdateHandler, p wire.Address, m *msgChannelUpdate) bool {
47+
c.version1Cache.mu.Lock()
48+
defer c.version1Cache.mu.Unlock()
49+
50+
if !(m.State.Version == 1 && c.version1Cache.enabled > 0) {
51+
return false
52+
}
53+
54+
c.version1Cache.cache = append(c.version1Cache.cache, cachedUpdate{
55+
uh: uh,
56+
p: p,
57+
m: m,
58+
})
59+
return true
60+
}
61+
4462
type (
4563
// ChannelUpdate is a channel update proposal.
4664
ChannelUpdate struct {

0 commit comments

Comments
 (0)