-
-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blockwise: Use hash instead of token for cache #554
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,9 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"hash/crc64" | ||
"io" | ||
"net" | ||
"time" | ||
|
||
"github.com/dsnet/golib/memfile" | ||
|
@@ -131,14 +133,17 @@ type Client interface { | |
AcquireMessage(ctx context.Context) *pool.Message | ||
// return back the message to the pool for next use | ||
ReleaseMessage(m *pool.Message) | ||
|
||
// The remote address for determining the endpoint pair | ||
RemoteAddr() net.Addr | ||
} | ||
|
||
type BlockWise[C Client] struct { | ||
cc C | ||
receivingMessagesCache *cache.Cache[uint64, *messageGuard] | ||
sendingMessagesCache *cache.Cache[uint64, *pool.Message] | ||
errors func(error) | ||
getSentRequestFromOutside func(token message.Token) (*pool.Message, bool) | ||
getSentRequestFromOutside func(hash uint64) (*pool.Message, bool) | ||
expiration time.Duration | ||
} | ||
|
||
|
@@ -160,10 +165,10 @@ func New[C Client]( | |
cc C, | ||
expiration time.Duration, | ||
errors func(error), | ||
getSentRequestFromOutside func(token message.Token) (*pool.Message, bool), | ||
getSentRequestFromOutside func(hash uint64) (*pool.Message, bool), | ||
) *BlockWise[C] { | ||
if getSentRequestFromOutside == nil { | ||
getSentRequestFromOutside = func(message.Token) (*pool.Message, bool) { return nil, false } | ||
getSentRequestFromOutside = func(uint64) (*pool.Message, bool) { return nil, false } | ||
} | ||
return &BlockWise[C]{ | ||
cc: cc, | ||
|
@@ -214,11 +219,12 @@ func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, do | |
if !ok { | ||
expire = time.Now().Add(b.expiration) | ||
} | ||
_, loaded := b.sendingMessagesCache.LoadOrStore(r.Token().Hash(), cache.NewElement(r, expire, nil)) | ||
matchableHash := generateMatchableHash(r.Options(), b.cc.RemoteAddr(), r.Code()) | ||
_, loaded := b.sendingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(r, expire, nil)) | ||
if loaded { | ||
return nil, errors.New("invalid token") | ||
} | ||
defer b.sendingMessagesCache.Delete(r.Token().Hash()) | ||
defer b.sendingMessagesCache.Delete(matchableHash) | ||
if r.Body() == nil { | ||
return do(r) | ||
} | ||
|
@@ -282,9 +288,9 @@ func (b *BlockWise[C]) WriteMessage(request *pool.Message, maxSZX SZX, maxMessag | |
if err != nil { | ||
return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err) | ||
} | ||
|
||
matchableHash := generateMatchableHash(request.Options(), b.cc.RemoteAddr(), request.Code()) | ||
w := newWriteRequestResponse(b.cc, request) | ||
err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock) | ||
err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, matchableHash) | ||
if err != nil { | ||
return fmt.Errorf("cannot start writing request: %w", err) | ||
} | ||
|
@@ -333,8 +339,8 @@ func wantsToBeReceived(r *pool.Message) bool { | |
return true | ||
} | ||
|
||
func (b *BlockWise[C]) getSendingMessageCode(token uint64) (codes.Code, bool) { | ||
v := b.sendingMessagesCache.Load(token) | ||
func (b *BlockWise[C]) getSendingMessageCode(hash uint64) (codes.Code, bool) { | ||
v := b.sendingMessagesCache.Load(hash) | ||
if v == nil { | ||
return codes.Empty, false | ||
} | ||
|
@@ -348,19 +354,20 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa | |
} | ||
token := r.Token() | ||
|
||
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code()) | ||
|
||
if len(token) == 0 { | ||
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next) | ||
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next, matchableHash) | ||
if err != nil { | ||
b.sendEntityIncomplete(w, token) | ||
b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) | ||
} | ||
return | ||
} | ||
tokenStr := token.Hash() | ||
|
||
sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(tokenStr) | ||
sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(matchableHash) | ||
if !sendingMessageExist || wantsToBeReceived(r) { | ||
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next) | ||
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next, matchableHash) | ||
if err != nil { | ||
b.sendEntityIncomplete(w, token) | ||
b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) | ||
|
@@ -369,17 +376,17 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa | |
} | ||
more, err := b.continueSendingMessage(w, r, maxSZX, maxMessageSize, sendingMessageCode) | ||
if err != nil { | ||
b.sendingMessagesCache.Delete(tokenStr) | ||
b.sendingMessagesCache.Delete(matchableHash) | ||
b.errors(fmt.Errorf("continueSendingMessage(%v): %w", r, err)) | ||
return | ||
} | ||
// For codes GET,POST,PUT,DELETE, we want them to wait for pairing response and then delete them when the full response comes in or when timeout occurs. | ||
if !more && sendingMessageCode > codes.DELETE { | ||
b.sendingMessagesCache.Delete(tokenStr) | ||
b.sendingMessagesCache.Delete(matchableHash) | ||
} | ||
} | ||
|
||
func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error { | ||
func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), rxHash uint64) error { | ||
startSendingMessageBlock, err := EncodeBlockOption(maxSZX, 0, true) | ||
if err != nil { | ||
return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err) | ||
|
@@ -411,7 +418,7 @@ func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C] | |
return errP | ||
} | ||
} | ||
return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock) | ||
return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, rxHash) | ||
} | ||
|
||
func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX SZX, maxMessageSize uint32, block uint32) (sendMessage *pool.Message, more bool, err error) { | ||
|
@@ -504,7 +511,8 @@ func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C | |
} | ||
var sendMessage *pool.Message | ||
var more bool | ||
b.sendingMessagesCache.LoadWithFunc(r.Token().Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { | ||
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code()) | ||
b.sendingMessagesCache.LoadWithFunc(matchableHash, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { | ||
sendMessage, more, err = b.createSendingMessage(value.Data(), maxSZX, maxMessageSize, block) | ||
if err != nil { | ||
err = fmt.Errorf("cannot create sending message: %w", err) | ||
|
@@ -529,7 +537,7 @@ func isObserveResponse(msg *pool.Message) bool { | |
return msg.Code() >= codes.Created | ||
} | ||
|
||
func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32) error { | ||
func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32, rxHash uint64) error { | ||
payloadSize, err := w.Message().BodySize() | ||
if err != nil { | ||
return payloadSizeError(err) | ||
|
@@ -552,16 +560,16 @@ func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], | |
if !ok { | ||
expire = time.Now().Add(b.expiration) | ||
} | ||
el, loaded := b.sendingMessagesCache.LoadOrStore(sendingMessage.Token().Hash(), cache.NewElement(originalSendingMessage, expire, nil)) | ||
el, loaded := b.sendingMessagesCache.LoadOrStore(rxHash, cache.NewElement(originalSendingMessage, expire, nil)) | ||
if loaded { | ||
defer b.cc.ReleaseMessage(originalSendingMessage) | ||
return fmt.Errorf("cannot add message (%v) to sending message cache: message(%v) with token(%v) already exist", originalSendingMessage, el.Data(), sendingMessage.Token()) | ||
} | ||
return nil | ||
} | ||
|
||
func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message { | ||
data, ok := b.sendingMessagesCache.LoadWithFunc(token.Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { | ||
func (b *BlockWise[C]) getSentRequest(hash uint64) *pool.Message { | ||
data, ok := b.sendingMessagesCache.LoadWithFunc(hash, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { | ||
if value == nil { | ||
return nil | ||
} | ||
|
@@ -576,7 +584,7 @@ func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message { | |
if ok { | ||
return data.Data() | ||
} | ||
globalRequest, ok := b.getSentRequestFromOutside(token) | ||
globalRequest, ok := b.getSentRequestFromOutside(hash) | ||
if ok { | ||
return globalRequest | ||
} | ||
|
@@ -595,7 +603,8 @@ func (b *BlockWise[C]) handleObserveResponse(sentRequest *pool.Message) (message | |
validUntil := time.Now().Add(b.expiration) // context of observation can be expired. | ||
bwSentRequest := b.cloneMessage(sentRequest) | ||
bwSentRequest.SetToken(token) | ||
_, loaded := b.sendingMessagesCache.LoadOrStore(token.Hash(), cache.NewElement(bwSentRequest, validUntil, nil)) | ||
matchableHash := generateMatchableHash(sentRequest.Options(), b.cc.RemoteAddr(), sentRequest.Code()) | ||
_, loaded := b.sendingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(bwSentRequest, validUntil, nil)) | ||
if loaded { | ||
return nil, time.Time{}, errors.New("cannot process message: message with token already exist") | ||
} | ||
|
@@ -674,7 +683,7 @@ func copyToPayloadFromOffset(r *pool.Message, payloadFile *memfile.File, offset | |
return payloadSize, nil | ||
} | ||
|
||
func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Message, tokenStr uint64, validUntil time.Time) (*pool.Message, func(), error) { | ||
func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Message, matchableHash uint64, validUntil time.Time) (*pool.Message, func(), error) { | ||
cannotLockError := func(err error) error { | ||
return fmt.Errorf("processReceivedMessage: cannot lock message: %w", err) | ||
} | ||
|
@@ -708,11 +717,11 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag | |
return nil, nil, cannotLockError(errA) | ||
} | ||
appendToClose(mg) | ||
element, loaded := b.receivingMessagesCache.LoadOrStore(tokenStr, cache.NewElement(mg, validUntil, func(d *messageGuard) { | ||
element, loaded := b.receivingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(mg, validUntil, func(d *messageGuard) { | ||
if d == nil { | ||
return | ||
} | ||
b.sendingMessagesCache.Delete(tokenStr) | ||
b.sendingMessagesCache.Delete(matchableHash) | ||
})) | ||
// request was already stored in cache, silently | ||
if loaded { | ||
|
@@ -732,6 +741,38 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag | |
return mg.Message, closeFn, nil | ||
} | ||
|
||
/* | ||
RFC9175 1.1: | ||
Two request messages are said to be "matchable" if they occur between | ||
the same endpoint pair, have the same code, and have the same set of | ||
options, with the exception that elective NoCacheKey options and | ||
options involved in block-wise transfer (Block1, Block2, and Request- | ||
Tag) need not be the same. Two blockwise request operations are said | ||
to be matchable if their request messages are matchable. | ||
|
||
This function concatenates the IDs and values of relevant options, the string representation of the remote address, | ||
and the code of the message to generate a hash that can be used to match requests. | ||
*/ | ||
func generateMatchableHash(options message.Options, remoteAddr net.Addr, code codes.Code) uint64 { | ||
input := make([]byte, 0, 512) | ||
|
||
for _, opt := range options { | ||
switch opt.ID { | ||
// Skip Blockwise Options and NoCacheKey Options | ||
case message.Block1, message.Block2, message.Size1, message.Size2, message.RequestTag: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what this hash is used for, but you may want to exclude RequestTag here. Yes, the text says it is not part of the matchable set, but later on it says (roughly) that even if two requests are matchable, if they have different request tags, they should not be matched. When I wrote RFC9175, we had the choice of wording while not changing the actual mechanism: We went for easier for those implementing the client side (where Request-Tag is excluded for the definition of being matchable, and then if two requests are matchable even if you don't want them to be, you add a Request-Tag and there is an extra rule on it for the server), or easier for those implementing the server side (when Request-Tag would not have been special, but in all the client sections we would have said "if the request are matchable except for possibly differing Request-Tag values"). We went for the former -- I'm not sure it was the right decision, especially if generateMatchableHash is used on the server side. (I can't tell easily without having more context of go-coap, was just pointed to this issue by a colleague). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, the NoCacheKey options could more generally be recognzied from their option properties (lower bits of the number) rather than enumerating them (which will not be exhaustive). |
||
continue | ||
} | ||
input = append(input, byte(opt.ID)) | ||
input = append(input, opt.Value...) | ||
} | ||
|
||
input = append(input, []byte(remoteAddr.Network())...) | ||
input = append(input, []byte(remoteAddr.String())...) | ||
input = append(input, byte(code)) | ||
|
||
return crc64.Checksum(input, crc64.MakeTable(crc64.ISO)) | ||
} | ||
|
||
//nolint:gocyclo,gocognit | ||
func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), blockType message.OptionID, sizeType message.OptionID) error { | ||
token := r.Token() | ||
|
@@ -755,7 +796,8 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C | |
if err != nil { | ||
return fmt.Errorf("cannot decode block option: %w", err) | ||
} | ||
sentRequest := b.getSentRequest(token) | ||
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code()) | ||
sentRequest := b.getSentRequest(matchableHash) | ||
if sentRequest != nil { | ||
defer b.cc.ReleaseMessage(sentRequest) | ||
} | ||
|
@@ -770,9 +812,8 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C | |
} | ||
} | ||
|
||
tokenStr := token.Hash() | ||
var cachedReceivedMessageGuard *messageGuard | ||
if e := b.receivingMessagesCache.Load(tokenStr); e != nil { | ||
if e := b.receivingMessagesCache.Load(matchableHash); e != nil { | ||
cachedReceivedMessageGuard = e.Data() | ||
} | ||
if cachedReceivedMessageGuard == nil { | ||
|
@@ -783,15 +824,15 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C | |
return nil | ||
} | ||
} | ||
cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, tokenStr, validUntil) | ||
cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, matchableHash, validUntil) | ||
if err != nil { | ||
return err | ||
} | ||
defer closeCachedReceivedMessage() | ||
|
||
defer func(err *error) { | ||
if *err != nil { | ||
b.receivingMessagesCache.Delete(tokenStr) | ||
b.receivingMessagesCache.Delete(matchableHash) | ||
} | ||
}(&err) | ||
payloadFile, payloadSize, err := b.getPayloadFromCachedReceivedMessage(r, cachedReceivedMessage) | ||
|
@@ -805,12 +846,12 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C | |
return fmt.Errorf("cannot copy data to payload: %w", err) | ||
} | ||
if !more { | ||
b.receivingMessagesCache.Delete(tokenStr) | ||
b.receivingMessagesCache.Delete(matchableHash) | ||
cachedReceivedMessage.Remove(blockType) | ||
cachedReceivedMessage.Remove(sizeType) | ||
cachedReceivedMessage.SetType(r.Type()) | ||
if !bytes.Equal(cachedReceivedMessage.Token(), token) { | ||
b.sendingMessagesCache.Delete(tokenStr) | ||
b.sendingMessagesCache.Delete(matchableHash) | ||
} | ||
_, errS := cachedReceivedMessage.Body().Seek(0, io.SeekStart) | ||
if errS != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpenate-ellenbytech Thank you for contribution :)
This is an issue: the hash for the response will not match the request hash because r.Code() is different, and r.Options(), such as the URI, are not part of the response. Therefore, you cannot pair it correctly for block1.
For sending a big payload (Block1):
You can use RemoteAddress and MessageID, which need to be registered for each new request of type Confirmation/NonConfirmation and response. If MessageID is not supported, then only the remote address can be used to verify block1.
For receiving a big payload (Block2):
When you want to locate the sent request, this will not work because the hash will again be different. Therefore, you need to pair it with the request using MessageID. However, you need to set a new MessageID for each exchange (request and response).
BTW: For blockwise transfer via TCP, the token must be used as implemented now.
Therefore, create blockwiseTCP.go (original code) and blockwiseUDP.go, where blockwiseUDP.go will utilize MessageIDs.