@@ -41,7 +41,7 @@ type DSQueue struct {
4141 closeOnce sync.Once
4242 dequeue chan []byte
4343 ds datastore.Batching
44- enqueue chan string
44+ enqueue chan [] byte
4545 clear chan chan <- int
4646 closeTimeout time.Duration
4747 getn chan getRequest
@@ -59,7 +59,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
5959 closed : make (chan error , 1 ),
6060 dequeue : make (chan []byte ),
6161 ds : namespace .Wrap (ds , datastore .NewKey ("/dsq-" + name )),
62- enqueue : make (chan string ),
62+ enqueue : make (chan [] byte ),
6363 clear : make (chan chan <- int ),
6464 closeTimeout : cfg .closeTimeout ,
6565 getn : make (chan getRequest ),
@@ -105,7 +105,7 @@ func (q *DSQueue) Put(item []byte) (err error) {
105105 }
106106 }()
107107
108- q .enqueue <- string ( item )
108+ q .enqueue <- item
109109 return
110110}
111111
@@ -157,8 +157,8 @@ func (q *DSQueue) Name() string {
157157 return q .name
158158}
159159
160- func makeKey (item string , counter uint64 ) datastore.Key {
161- b64Item := base64 .RawURLEncoding .EncodeToString ([] byte ( item ) )
160+ func makeKey (item [] byte , counter uint64 ) datastore.Key {
161+ b64Item := base64 .RawURLEncoding .EncodeToString (item )
162162 return datastore .NewKey (fmt .Sprintf ("%016x/%s" , counter , b64Item ))
163163}
164164
@@ -167,9 +167,9 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
167167 defer close (q .closed )
168168
169169 var (
170- item string
170+ item [] byte
171171 counter uint64
172- inBuf deque.Deque [string ]
172+ inBuf deque.Deque [[] byte ]
173173 )
174174
175175 const baseCap = 1024
@@ -181,7 +181,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
181181 }
182182
183183 defer func () {
184- if item != "" {
184+ if item != nil {
185185 // Write the item directly, instead of pushing it to the front of
186186 // inbuf, in order to retain it's original kay, and therefore the
187187 // order in the datastore, which may not be empty.
@@ -226,7 +226,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
226226 }
227227
228228 for {
229- if item == "" {
229+ if item == nil {
230230 if ! dsEmpty {
231231 head , err := q .getQueueHead (ctx )
232232 if err != nil {
@@ -244,12 +244,11 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
244244 log .Errorw ("malformed queued item, removing it from queue" , "err" , err , "key" , head .Key , "qname" , q .name )
245245 continue
246246 }
247- itemBin , err : = base64 .RawURLEncoding .DecodeString (parts [1 ])
247+ item , err = base64 .RawURLEncoding .DecodeString (parts [1 ])
248248 if err != nil {
249249 log .Errorw ("error decoding queued item, removing it from queue" , "err" , err , "key" , head .Key , "qname" , q .name )
250250 continue
251251 }
252- item = string (itemBin )
253252 } else {
254253 dsEmpty = true
255254 }
@@ -265,7 +264,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
265264
266265 // If c != cid.Undef set dequeue and attempt write.
267266 var dequeue chan []byte
268- if item != "" {
267+ if item != nil {
269268 dequeue = q .dequeue
270269 }
271270
@@ -275,15 +274,16 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
275274 return
276275 }
277276 if dedupCache != nil {
278- if found , _ := dedupCache .ContainsOrAdd (toQueue , struct {}{}); found {
277+ cacheItem := string (toQueue )
278+ if found , _ := dedupCache .ContainsOrAdd (cacheItem , struct {}{}); found {
279279 // update recentness in LRU cache
280- dedupCache .Add (toQueue , struct {}{})
280+ dedupCache .Add (cacheItem , struct {}{})
281281 continue
282282 }
283283 }
284284 idle = false
285285
286- if item == "" {
286+ if item == nil {
287287 // Use this CID as the next output since there was nothing in
288288 // the datastore or buffer previously.
289289 item = toQueue
@@ -301,8 +301,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
301301 rspChan := getRequest .rsp
302302 var outItems [][]byte
303303
304- if item != "" {
305- outItems = append (outItems , [] byte ( item ) )
304+ if item != nil {
305+ outItems = append (outItems , item )
306306
307307 if ! dsEmpty {
308308 outItems , err = q .readDatastore (ctx , n - len (outItems ), outItems )
@@ -314,12 +314,12 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
314314 }
315315 }
316316
317- item = ""
317+ item = nil
318318 idle = false
319319 }
320320 if len (outItems ) < n {
321321 for itm := range inBuf .IterPopFront () {
322- outItems = append (outItems , [] byte ( itm ) )
322+ outItems = append (outItems , itm )
323323 if len (outItems ) == n {
324324 break
325325 }
@@ -329,8 +329,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
329329 items : outItems ,
330330 }
331331
332- case dequeue <- [] byte ( item ) :
333- item = ""
332+ case dequeue <- item :
333+ item = nil
334334 idle = false
335335
336336 case <- batchTimer .C :
@@ -339,7 +339,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
339339 commit = true
340340 } else {
341341 if inBuf .Cap () > baseCap {
342- inBuf = deque.Deque [string ]{}
342+ inBuf = deque.Deque [[] byte ]{}
343343 inBuf .SetBaseCap (baseCap )
344344 }
345345 }
@@ -352,10 +352,10 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
352352
353353 case rsp := <- q .clear :
354354 var rmMemCount int
355- if item != "" {
355+ if item != nil {
356356 rmMemCount = 1
357357 }
358- item = ""
358+ item = nil
359359 k = datastore.Key {}
360360 idle = false
361361 rmMemCount += inBuf .Len ()
@@ -452,7 +452,7 @@ func (q *DSQueue) getQueueHead(ctx context.Context) (*query.Entry, error) {
452452 return & r .Entry , r .Error
453453}
454454
455- func (q * DSQueue ) commitInput (ctx context.Context , counter uint64 , items * deque.Deque [string ]) error {
455+ func (q * DSQueue ) commitInput (ctx context.Context , counter uint64 , items * deque.Deque [[] byte ]) error {
456456 if ctx .Err () != nil {
457457 return ctx .Err ()
458458 }
@@ -527,12 +527,12 @@ func (q *DSQueue) readDatastore(ctx context.Context, n int, items [][]byte) ([][
527527 log .Errorw ("malformed queued item, removing it from queue" , "err" , err , "key" , result .Key , "qname" , q .name )
528528 continue
529529 }
530- itemBin , err := base64 .RawURLEncoding .DecodeString (parts [1 ])
530+ item , err := base64 .RawURLEncoding .DecodeString (parts [1 ])
531531 if err != nil {
532532 log .Errorw ("error decoding queued item, removing it from queue" , "err" , err , "key" , result .Key , "qname" , q .name )
533533 continue
534534 }
535- items = append (items , itemBin )
535+ items = append (items , item )
536536 }
537537
538538 if err = batch .Commit (ctx ); err != nil {
0 commit comments