Skip to content

Commit

Permalink
feat: Add WithRequestMonitor Option
Browse files Browse the repository at this point in the history
Allow CoAP requests to be propagated to application handlers.

Signed-off-by: Jeff Welder <Jeff.Welder@ellenbytech.com>
a48812
  • Loading branch information
jeffwelder-ellenbytech committed Nov 6, 2023
1 parent a48812e commit 191de67
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 9 deletions.
7 changes: 7 additions & 0 deletions dtls/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type ErrorFunc = func(error)
// OnNewConnFunc is the callback for new connections.
type OnNewConnFunc = func(cc *udpClient.Conn)

// RequestMonitorFunc is the callback to see any requests.
type RequestMonitorFunc = func(cc *udpClient.Conn, req *pool.Message)

type GetMIDFunc = func() int32

var DefaultConfig = func() Config {
Expand All @@ -34,6 +37,9 @@ var DefaultConfig = func() Config {
}
return inactivity.New(timeout, onInactive)
},
RequestMonitor: func(cc *udpClient.Conn, req *pool.Message) {
// do nothing by default
},
OnNewConn: func(cc *udpClient.Conn) {
// do nothing by default
},
Expand All @@ -57,6 +63,7 @@ type Config struct {
GetMID GetMIDFunc
Handler HandlerFunc
OnNewConn OnNewConnFunc
RequestMonitor RequestMonitorFunc
TransmissionNStart uint32
TransmissionAcknowledgeTimeout time.Duration
TransmissionMaxRetransmit uint32
Expand Down
13 changes: 9 additions & 4 deletions dtls/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func New(opt ...Option) *Server {
return inactivity.NewNilMonitor[*udpClient.Conn]()
}
}

if cfg.MessagePool == nil {
cfg.MessagePool = pool.New(0, 0)
}
Expand Down Expand Up @@ -158,8 +159,10 @@ func (s *Server) Serve(l Listener) error {
}
wg.Add(1)
var cc *udpClient.Conn
monitor := s.cfg.CreateInactivityMonitor()
cc = s.createConn(coapNet.NewConn(rw), monitor)
inactivityMonitor := s.cfg.CreateInactivityMonitor()
requestMonitor := s.cfg.RequestMonitor

cc = s.createConn(coapNet.NewConn(rw), inactivityMonitor, requestMonitor)
if s.cfg.OnNewConn != nil {
s.cfg.OnNewConn(cc)
}
Expand All @@ -184,7 +187,7 @@ func (s *Server) Stop() {
}
}

func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.InactivityMonitor) *udpClient.Conn {
func (s *Server) createConn(connection *coapNet.Conn, inactivityMonitor udpClient.InactivityMonitor, requestMonitor udpClient.RequestMonitorFunc) *udpClient.Conn {
createBlockWise := func(cc *udpClient.Conn) *blockwise.BlockWise[*udpClient.Conn] {
return nil
}
Expand Down Expand Up @@ -220,10 +223,12 @@ func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.Inactivi
cfg.MessagePool = s.cfg.MessagePool
cfg.ReceivedMessageQueueSize = s.cfg.ReceivedMessageQueueSize
cfg.ProcessReceivedMessage = s.cfg.ProcessReceivedMessage

cc := udpClient.NewConn(
session,
createBlockWise,
monitor,
inactivityMonitor,
requestMonitor,
&cfg,
)

Expand Down
51 changes: 51 additions & 0 deletions options/commonOptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,57 @@ func WithOnNewConn[F OnNewConnFunc](onNewConn F) OnNewConnOpt[F] {
}
}

// WithRequestMonitor
type WithRequestMonitorFunc interface {
tcpServer.RequestMonitorFunc | udpServer.RequestMonitorFunc
}

// WithRequestMonitorOpt network option.
type WithRequestMonitorOpt[F WithRequestMonitorFunc] struct {
f F
}

func panicForInvalidWithRequestMonitorFunc(t, exp any) {
panic(fmt.Errorf("invalid WithRequestMonitorFunc type %T, expected %T", t, exp))
}

func (o WithRequestMonitorOpt[F]) UDPServerApply(cfg *udpServer.Config) {
switch v := any(o.f).(type) {
case udpServer.RequestMonitorFunc:
cfg.RequestMonitor = v
default:
var exp udpServer.RequestMonitorFunc
panicForInvalidWithRequestMonitorFunc(v, exp)
}
}

func (o WithRequestMonitorOpt[F]) DTLSServerApply(cfg *dtlsServer.Config) {
switch v := any(o.f).(type) {
case udpServer.RequestMonitorFunc:
cfg.RequestMonitor = v
default:
var exp udpServer.RequestMonitorFunc
panicForInvalidWithRequestMonitorFunc(v, exp)
}
}

func (o WithRequestMonitorOpt[F]) TCPServerApply(cfg *tcpServer.Config) {
switch v := any(o.f).(type) {
case tcpServer.RequestMonitorFunc:
cfg.RequestMonitor = v
default:
var exp tcpServer.RequestMonitorFunc
panicForInvalidWithRequestMonitorFunc(v, exp)
}
}

// WithRequestMonitor ping handler
func WithRequestMonitor[F WithRequestMonitorFunc](requestMonitor F) WithRequestMonitorOpt[F] {
return WithRequestMonitorOpt[F]{
f: requestMonitor,
}
}

// CloseSocketOpt close socket option.
type CloseSocketOpt struct{}

Expand Down
4 changes: 4 additions & 0 deletions tcp/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type (
EventFunc = func()
GetMIDFunc = func() int32
CreateInactivityMonitorFunc = func() InactivityMonitor
RequestMonitorFunc = func(cc *Conn, req *pool.Message)
)

type Notifier interface {
Expand Down Expand Up @@ -59,6 +60,7 @@ func NewConn(
connection *coapNet.Conn,
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn],
inactivityMonitor InactivityMonitor,
requestMonitor RequestMonitorFunc,
cfg *Config,
) *Conn {
if cfg.GetToken == nil {
Expand All @@ -80,6 +82,7 @@ func NewConn(
cfg.DisableTCPSignalMessageCSM,
cfg.CloseSocket,
inactivityMonitor,
requestMonitor,
cfg.ConnectionCacheSize,
cfg.MessagePool,
)
Expand Down Expand Up @@ -336,6 +339,7 @@ func (cc *Conn) handleSignals(r *pool.Message) bool {
// if r.HasOption(message.TCPCustody) {
// TODO
// }
cc.session.requestMonitor(cc, r)
if err := cc.sendPong(r.Token()); err != nil && !coapNet.IsConnectionBrokenError(err) {
cc.Session().errors(fmt.Errorf("cannot handle ping signal: %w", err))
}
Expand Down
3 changes: 3 additions & 0 deletions tcp/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Session struct {
// See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
sequence atomic.Uint64
inactivityMonitor InactivityMonitor
requestMonitor RequestMonitorFunc
errSendCSM error
cancel context.CancelFunc
done chan struct{}
Expand All @@ -48,6 +49,7 @@ func NewSession(
disableTCPSignalMessageCSM bool,
closeSocket bool,
inactivityMonitor InactivityMonitor,
requestMonitor RequestMonitorFunc,
connectionCacheSize uint16,
messagePool *pool.Pool,
) *Session {
Expand All @@ -69,6 +71,7 @@ func NewSession(
disableTCPSignalMessageCSM: disableTCPSignalMessageCSM,
closeSocket: closeSocket,
inactivityMonitor: inactivityMonitor,
requestMonitor: requestMonitor,
done: make(chan struct{}),
connectionCacheSize: connectionCacheSize,
messagePool: messagePool,
Expand Down
7 changes: 7 additions & 0 deletions tcp/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type GoPoolFunc = func(func()) error
// OnNewConnFunc is the callback for new connections.
type OnNewConnFunc = func(cc *client.Conn)

// RequestMonitorFunc is the callback to see any requests.
type RequestMonitorFunc = func(cc *client.Conn, req *pool.Message)

var DefaultConfig = func() Config {
opts := Config{
Common: config.NewCommon[*client.Conn](),
Expand All @@ -41,6 +44,9 @@ var DefaultConfig = func() Config {
OnNewConn: func(cc *client.Conn) {
// do nothing by default
},
RequestMonitor: func(cc *client.Conn, req *pool.Message) {
// do nothing by default
},
ConnectionCacheSize: 2 * 1024,
}
opts.Handler = func(w *responsewriter.ResponseWriter[*client.Conn], r *pool.Message) {
Expand All @@ -56,6 +62,7 @@ type Config struct {
CreateInactivityMonitor client.CreateInactivityMonitorFunc
Handler HandlerFunc
OnNewConn OnNewConnFunc
RequestMonitor RequestMonitorFunc
ConnectionCacheSize uint16
DisablePeerTCPSignalMessageCSMs bool
DisableTCPSignalMessageCSM bool
Expand Down
10 changes: 6 additions & 4 deletions tcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func (s *Server) checkAcceptError(err error) bool {

func (s *Server) serveConnection(connections *connections.Connections, rw net.Conn) {
var cc *client.Conn
monitor := s.cfg.CreateInactivityMonitor()
cc = s.createConn(coapNet.NewConn(rw), monitor)
inactivityMonitor := s.cfg.CreateInactivityMonitor()
requestMonitor := s.cfg.RequestMonitor
cc = s.createConn(coapNet.NewConn(rw), inactivityMonitor, requestMonitor)
if s.cfg.OnNewConn != nil {
s.cfg.OnNewConn(cc)
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func (s *Server) Stop() {
}
}

func (s *Server) createConn(connection *coapNet.Conn, monitor client.InactivityMonitor) *client.Conn {
func (s *Server) createConn(connection *coapNet.Conn, inactivityMonitor client.InactivityMonitor, requestMonitor RequestMonitorFunc) *client.Conn {
createBlockWise := func(cc *client.Conn) *blockwise.BlockWise[*client.Conn] {
return nil
}
Expand Down Expand Up @@ -215,7 +216,8 @@ func (s *Server) createConn(connection *coapNet.Conn, monitor client.InactivityM
cc := client.NewConn(
connection,
createBlockWise,
monitor,
inactivityMonitor,
requestMonitor,
&cfg,
)

Expand Down
12 changes: 11 additions & 1 deletion udp/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
EventFunc = func()
GetMIDFunc = func() int32
CreateInactivityMonitorFunc = func() InactivityMonitor
RequestMonitorFunc = func(cc *Conn, req *pool.Message)
)

type InactivityMonitor interface {
Expand Down Expand Up @@ -133,6 +134,7 @@ type Conn struct {
session Session
*client.Client[*Conn]
inactivityMonitor InactivityMonitor
requestMonitor RequestMonitorFunc

blockWise *blockwise.BlockWise[*Conn]
observationHandler *observation.Handler[*Conn]
Expand Down Expand Up @@ -189,6 +191,7 @@ func NewConn(
session Session,
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn],
inactivityMonitor InactivityMonitor,
requestMonitor RequestMonitorFunc,
cfg *Config,
) *Conn {
if cfg.Errors == nil {
Expand Down Expand Up @@ -222,6 +225,7 @@ func NewConn(
msgIDMutex: NewMutexMap(),
responseMsgCache: cache.NewCache[string, []byte](),
inactivityMonitor: inactivityMonitor,
requestMonitor: requestMonitor,
messagePool: cfg.MessagePool,
numOutstandingInteraction: semaphore.NewWeighted(math.MaxInt64),
}
Expand Down Expand Up @@ -741,12 +745,17 @@ func (cc *Conn) handlePong(w *responsewriter.ResponseWriter[*Conn], r *pool.Mess
cc.sendPong(w, r)
}

func (cc *Conn) IsPingMessage(r *pool.Message) bool {
return r.Code() == codes.Empty && r.Type() == message.Confirmable && len(r.Token()) == 0 && len(r.Options()) == 0 && r.Body() == nil
}

func (cc *Conn) handleSpecialMessages(r *pool.Message) bool {
// ping request
if r.Code() == codes.Empty && r.Type() == message.Confirmable && len(r.Token()) == 0 && len(r.Options()) == 0 && r.Body() == nil {
if cc.IsPingMessage(r) {
cc.ProcessReceivedMessageWithHandler(r, cc.handlePong)
return true
}

// if waits for concrete message handler
if elem, ok := cc.midHandlerContainer.LoadAndDelete(r.MessageID()); ok {
elem.ReleaseMessage(cc)
Expand Down Expand Up @@ -782,6 +791,7 @@ func (cc *Conn) Process(datagram []byte) error {
req.SetSequence(cc.Sequence())
cc.checkMyMessageID(req)
cc.inactivityMonitor.Notify()
cc.requestMonitor(cc, req)
if cc.handleSpecialMessages(req) {
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions udp/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type ErrorFunc = func(error)
// OnNewConnFunc is the callback for new connections.
type OnNewConnFunc = func(cc *udpClient.Conn)

// RequestMonitorFunc is the callback to see any requests.
type RequestMonitorFunc = func(cc *udpClient.Conn, req *pool.Message)

type GetMIDFunc = func() int32

var DefaultConfig = func() Config {
Expand All @@ -37,6 +40,9 @@ var DefaultConfig = func() Config {
OnNewConn: func(cc *udpClient.Conn) {
// do nothing by default
},
RequestMonitor: func(cc *udpClient.Conn, req *pool.Message) {
// do nothing by default
},
TransmissionNStart: 1,
TransmissionAcknowledgeTimeout: time.Second * 2,
TransmissionMaxRetransmit: 4,
Expand All @@ -57,6 +63,7 @@ type Config struct {
GetMID GetMIDFunc
Handler HandlerFunc
OnNewConn OnNewConnFunc
RequestMonitor RequestMonitorFunc
TransmissionNStart uint32
TransmissionAcknowledgeTimeout time.Duration
TransmissionMaxRetransmit uint32
Expand Down
2 changes: 2 additions & 0 deletions udp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@ func (s *Server) getOrCreateConn(udpConn *coapNet.UDPConn, raddr *net.UDPAddr) (
cfg.ProcessReceivedMessage = s.cfg.ProcessReceivedMessage
cfg.ReceivedMessageQueueSize = s.cfg.ReceivedMessageQueueSize

requestMonitor := s.cfg.RequestMonitor
cc = client.NewConn(
session,
createBlockWise,
monitor,
requestMonitor,
&cfg,
)
cc.SetContextValue(closeKey, func() {
Expand Down

0 comments on commit 191de67

Please sign in to comment.