Skip to content

Commit

Permalink
[udp] make response message cache configurable
Browse files Browse the repository at this point in the history
Modifies UDP conn implementation to allow for a custom response message
cache to be provided. This change is implemented in such a way that API
compatibility with previous v3 releases is preserved. Users may now
provide an option to supply their own cache implementation. Structured
messages, rather than their serialized representation, are now passed to
the cache implementation to allow for caching decisions to be made in
the cache implementation. For example, it may be desirable to skip
caching blockwise message responses if the entire underlying data being
transferred is also cached. The cache implementation is responsible for
cloning messages or otherwise ensuring that it is not storing data that
may subsequently be modified.

Signed-off-by: Daniel Mangum <georgedanielmangum@gmail.com>
  • Loading branch information
hasheddan authored and jkralik committed Nov 8, 2024
1 parent 2404bac commit 71407da
Showing 1 changed file with 82 additions and 27 deletions.
109 changes: 82 additions & 27 deletions udp/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -127,6 +128,58 @@ func (m *midElement) GetMessage(cc *Conn) (*pool.Message, bool, error) {
return msg, true, nil
}

// MessageCache is a cache of CoAP messages.
type MessageCache interface {
Load(key string, msg *pool.Message) (bool, error)
Store(key string, msg *pool.Message) error
CheckExpirations(time.Time)
}

// messageCache is a CoAP message cache backed by an in-memory cache.
type messageCache struct {
c *cache.Cache[string, []byte]
}

// newMessageCache constructs a new CoAP message cache.
func newMessageCache() *messageCache {
return &messageCache{
c: cache.NewCache[string, []byte](),
}
}

// Load loads a message from the cache if one exists with key.
func (m *messageCache) Load(key string, msg *pool.Message) (bool, error) {
cachedResp := m.c.Load(key)
if cachedResp == nil {
return false, nil
}
if rawMsg := cachedResp.Data(); len(rawMsg) > 0 {
_, err := msg.UnmarshalWithDecoder(coder.DefaultCoder, rawMsg)
if err != nil {
return false, err
}
return true, nil
}
return false, nil
}

// Store stores a message in the cache.
func (m *messageCache) Store(key string, msg *pool.Message) error {
marshaledResp, err := msg.MarshalWithEncoder(coder.DefaultCoder)
if err != nil {
return err
}
cacheMsg := make([]byte, len(marshaledResp))
copy(cacheMsg, marshaledResp)
m.c.LoadOrStore(key, cache.NewElement(cacheMsg, time.Now().Add(ExchangeLifetime), nil))
return nil
}

// CheckExpirations checks the cache for any expirations.
func (m *messageCache) CheckExpirations(now time.Time) {
m.c.CheckExpirations(now)
}

// Conn represents a virtual connection to a conceptual endpoint, to perform COAPs commands.
type Conn struct {
// This field needs to be the first in the struct to ensure proper word alignment on 32-bit platforms.
Expand All @@ -145,7 +198,7 @@ type Conn struct {

processReceivedMessage config.ProcessReceivedMessageFunc[*Conn]
errors ErrorFunc
responseMsgCache *cache.Cache[string, []byte]
responseMsgCache MessageCache
msgIDMutex *MutexMap

tokenHandlerContainer *coapSync.Map[uint64, HandlerFunc]
Expand Down Expand Up @@ -192,6 +245,7 @@ type ConnOptions struct {
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]
inactivityMonitor InactivityMonitor
requestMonitor RequestMonitorFunc
responseMsgCache MessageCache
}

type Option = func(opts *ConnOptions)
Expand Down Expand Up @@ -220,6 +274,23 @@ func WithRequestMonitor(requestMonitor RequestMonitorFunc) Option {
}
}

// WithResponseMessageCache sets the cache used for response messages. All
// response messages are submitted to the cache, but it is up to the cache
// implementation to determine which messages are stored and for how long.
// Caching responses enables sending the same Acknowledgment for retransmitted
// confirmable messages within an EXCHANGE_LIFETIME. It may be desirable to
// relax this behavior in some scenarios.
// https://datatracker.ietf.org/doc/html/rfc7252#section-4.5
// The default response message cache stores all responses with an expiration of
// 247 seconds, which is EXCHANGE_LIFETIME when using default CoAP transmission
// parameters.
// https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2
func WithResponseMessageCache(cache MessageCache) Option {
return func(opts *ConnOptions) {
opts.responseMsgCache = cache
}
}

func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
if cfg.Errors == nil {
cfg.Errors = func(error) {
Expand Down Expand Up @@ -248,6 +319,10 @@ func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
for _, o := range opts {
o(&cfgOpts)
}
// Only construct cache if one was not set via options.
if cfgOpts.responseMsgCache == nil {
cfgOpts.responseMsgCache = newMessageCache()
}
cc := Conn{
session: session,
transmission: &Transmission{
Expand All @@ -262,7 +337,7 @@ func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
processReceivedMessage: cfg.ProcessReceivedMessage,
errors: cfg.Errors,
msgIDMutex: NewMutexMap(),
responseMsgCache: cache.NewCache[string, []byte](),
responseMsgCache: cfgOpts.responseMsgCache,
inactivityMonitor: cfgOpts.inactivityMonitor,
requestMonitor: cfgOpts.requestMonitor,
messagePool: cfg.MessagePool,
Expand Down Expand Up @@ -609,34 +684,14 @@ func (cc *Conn) Sequence() uint64 {
return cc.sequence.Add(1)
}

func (cc *Conn) responseMsgCacheID(msgID int32) string {
return fmt.Sprintf("resp-%v-%d", cc.RemoteAddr(), msgID)
// getResponseFromCache gets a message from the response message cache.
func (cc *Conn) getResponseFromCache(mid int32, resp *pool.Message) (bool, error) {
return cc.responseMsgCache.Load(strconv.Itoa(int(mid)), resp)
}

// addResponseToCache adds a message to the response message cache.
func (cc *Conn) addResponseToCache(resp *pool.Message) error {
marshaledResp, err := resp.MarshalWithEncoder(coder.DefaultCoder)
if err != nil {
return err
}
cacheMsg := make([]byte, len(marshaledResp))
copy(cacheMsg, marshaledResp)
cc.responseMsgCache.LoadOrStore(cc.responseMsgCacheID(resp.MessageID()), cache.NewElement(cacheMsg, time.Now().Add(ExchangeLifetime), nil))
return nil
}

func (cc *Conn) getResponseFromCache(mid int32, resp *pool.Message) (bool, error) {
cachedResp := cc.responseMsgCache.Load(cc.responseMsgCacheID(mid))
if cachedResp == nil {
return false, nil
}
if rawMsg := cachedResp.Data(); len(rawMsg) > 0 {
_, err := resp.UnmarshalWithDecoder(coder.DefaultCoder, rawMsg)
if err != nil {
return false, err
}
return true, nil
}
return false, nil
return cc.responseMsgCache.Store(strconv.Itoa(int(resp.MessageID())), resp)
}

// checkMyMessageID compare client msgID against peer messageID and if it is near < 0xffff/4 then increase msgID.
Expand Down

0 comments on commit 71407da

Please sign in to comment.