Skip to content

Commit 4323ec3

Browse files
committed
read loop
1 parent 95bed75 commit 4323ec3

File tree

3 files changed

+11
-48
lines changed

3 files changed

+11
-48
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ require (
4646
github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267
4747
github.com/karalabe/hid v1.0.1-0.20240306101548-573246063e52
4848
github.com/kylelemons/godebug v1.1.0
49+
github.com/libp2p/go-buffer-pool v0.1.0
4950
github.com/mattn/go-colorable v0.1.13
5051
github.com/mattn/go-isatty v0.0.20
5152
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4F
238238
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
239239
github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4=
240240
github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c=
241+
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
242+
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
241243
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
242244
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
243245
github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=

p2p/discover/v5_udp.go

Lines changed: 8 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ type UDPv5 struct {
9797
sendCh chan sendRequest
9898
sendNoRespCh chan *sendNoRespRequest
9999
unhandled chan<- ReadPacket
100-
writeCh chan pendingWrite // New channel for outgoing packets
101100

102101
// state of dispatch
103102
codec codecV5
@@ -113,12 +112,6 @@ type UDPv5 struct {
113112
wg sync.WaitGroup
114113
}
115114

116-
// pendingWrite holds data for a packet to be sent by the writeLoop.
117-
type pendingWrite struct {
118-
toAddr netip.AddrPort
119-
data []byte
120-
}
121-
122115
type sendRequest struct {
123116
destID enode.ID
124117
destAddr netip.AddrPort
@@ -163,10 +156,9 @@ func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
163156
return nil, err
164157
}
165158
go t.tab.loop()
166-
t.wg.Add(3)
159+
t.wg.Add(2)
167160
go t.readLoop()
168161
go t.dispatch()
169-
go t.writeLoop()
170162
return t, nil
171163
}
172164

@@ -186,14 +178,13 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
186178
clock: cfg.Clock,
187179
respTimeout: cfg.V5RespTimeout,
188180
// channels into dispatch
189-
packetInCh: make(chan ReadPacket, 64),
181+
packetInCh: make(chan ReadPacket, 4),
190182
callCh: make(chan *callV5),
191183
callDoneCh: make(chan *callV5),
192184
sendCh: make(chan sendRequest),
193185
sendNoRespCh: make(chan *sendNoRespRequest),
194186
respTimeoutCh: make(chan *callTimeout),
195187
unhandled: cfg.Unhandled,
196-
writeCh: make(chan pendingWrite, 32), // Buffered channel for outgoing packets
197188
// state of dispatch
198189
codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID),
199190
activeCallByNode: make(map[enode.ID]*callV5),
@@ -625,7 +616,6 @@ func (t *UDPv5) dispatch() {
625616
t.handlePacket(p.Data, p.Addr)
626617

627618
case <-t.closeCtx.Done():
628-
close(t.writeCh)
629619
for id, queue := range t.callQueue {
630620
for _, c := range queue {
631621
c.err <- errClosed
@@ -766,39 +756,9 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet,
766756
return nonce, err
767757
}
768758

759+
_, err = t.conn.WriteToUDPAddrPort(enc, toAddr)
769760
t.log.Trace(">> "+packet.Name(), t.logcontext...)
770-
771-
dataForSend := make([]byte, len(enc))
772-
copy(dataForSend, enc)
773-
774-
pw := pendingWrite{
775-
toAddr: toAddr,
776-
data: dataForSend, // codec.Encode should return a new slice, safe to pass directly.
777-
}
778-
779-
select {
780-
case t.writeCh <- pw:
781-
// Packet successfully queued.
782-
return nonce, nil
783-
case <-t.closeCtx.Done():
784-
return nonce, errClosed
785-
}
786-
}
787-
788-
// writeLoop runs in its own goroutine and sends packets from the writeCh to the network.
789-
func (t *UDPv5) writeLoop() {
790-
defer t.wg.Done()
791-
for pw := range t.writeCh { // Loop continues until writeCh is closed and empty.
792-
_, err := t.conn.WriteToUDPAddrPort(pw.data, pw.toAddr)
793-
if netutil.IsTemporaryError(err) {
794-
t.log.Debug("Temporary UDP write error", "addr", pw.toAddr, "err", err)
795-
} else if err != nil {
796-
if !errors.Is(err, net.ErrClosed) || !errors.Is(err, io.EOF) {
797-
t.log.Warn("UDP write error", "addr", pw.toAddr, "err", err)
798-
}
799-
return
800-
}
801-
}
761+
return nonce, err
802762
}
803763

804764
// readLoop runs in its own goroutine and reads packets from the network.
@@ -819,7 +779,9 @@ func (t *UDPv5) readLoop() {
819779
}
820780
return
821781
}
822-
t.dispatchReadPacket(from, buf[:nbytes])
782+
content := make([]byte, nbytes)
783+
copy(content, buf[:nbytes])
784+
t.dispatchReadPacket(from, content)
823785
}
824786
}
825787

@@ -829,10 +791,8 @@ func (t *UDPv5) dispatchReadPacket(from netip.AddrPort, content []byte) bool {
829791
if from.Addr().Is4In6() {
830792
from = netip.AddrPortFrom(netip.AddrFrom4(from.Addr().As4()), from.Port())
831793
}
832-
data := make([]byte, len(content))
833-
copy(data, content)
834794
select {
835-
case t.packetInCh <- ReadPacket{data, from}:
795+
case t.packetInCh <- ReadPacket{content, from}:
836796
return true
837797
case <-t.closeCtx.Done():
838798
return false

0 commit comments

Comments
 (0)