Skip to content

Commit

Permalink
Update UDP: Ensure propagation of control message to pool.Message
Browse files Browse the repository at this point in the history
This commit enhances the UDP functionality, ensuring proper dissemination
of control messages to pool.Message for improved network coordination
and responsiveness
  • Loading branch information
jkralik committed Nov 13, 2023
1 parent 1427972 commit 2249b85
Show file tree
Hide file tree
Showing 13 changed files with 751 additions and 315 deletions.
2 changes: 1 addition & 1 deletion dtls/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *Session) Run(cc *client.Conn) (err error) {
return fmt.Errorf("cannot read from connection: %w", err)
}
readBuf = readBuf[:readLen]
err = cc.Process(readBuf)
err = cc.Process(nil, readBuf)
if err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions message/pool/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/net"
"go.uber.org/atomic"
)

Expand All @@ -26,6 +27,7 @@ type Message struct {
// Context context of request.
ctx context.Context
msg message.Message
controlMessage *net.ControlMessage // control message for UDP
hijacked atomic.Bool
isModified bool
valueBuffer []byte
Expand Down Expand Up @@ -73,6 +75,22 @@ func (r *Message) SetMessage(message message.Message) {
r.isModified = true
}

func (r *Message) SetControlMessage(cm *net.ControlMessage) {
r.controlMessage = cm
}

func (r *Message) ControlMessage() *net.ControlMessage {
return r.controlMessage
}

// UpsertControlMessage set value only when origin value is not set.
func (r *Message) UpsertControlMessage(cm *net.ControlMessage) {
if r.controlMessage != nil {
return
}
r.SetControlMessage(cm)
}

// SetMessageID only 0 to 2^16-1 are valid.
func (r *Message) SetMessageID(mid int32) {
r.msg.MessageID = mid
Expand Down Expand Up @@ -120,6 +138,7 @@ func (r *Message) Reset() {
r.valueBuffer = r.origValueBuffer
r.body = nil
r.isModified = false
r.controlMessage = nil
if cap(r.bufferMarshal) > 1024 {
r.bufferMarshal = make([]byte, 256)
}
Expand Down Expand Up @@ -568,6 +587,7 @@ func (r *Message) Clone(msg *Message) error {
msg.ResetOptionsTo(r.Options())
msg.SetType(r.Type())
msg.SetMessageID(r.MessageID())
msg.SetControlMessage(r.ControlMessage())

if r.Body() != nil {
buf := bytes.NewBuffer(nil)
Expand Down
Loading

0 comments on commit 2249b85

Please sign in to comment.