@@ -36,7 +36,7 @@ var log = logging.Logger("dsqueue")
3636// then queued items can remain in memory. When the queue is closed, any
3737// remaining items in memory are written to the datastore.
3838type DSQueue struct {
39- close context.CancelFunc
39+ cancel context.CancelFunc
4040 closed chan error
4141 closeOnce sync.Once
4242 dequeue chan []byte
@@ -54,7 +54,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
5454 ctx , cancel := context .WithCancel (context .Background ())
5555
5656 q := & DSQueue {
57- close : cancel ,
57+ cancel : cancel ,
5858 closed : make (chan error , 1 ),
5959 dequeue : make (chan []byte ),
6060 ds : namespace .Wrap (ds , datastore .NewKey ("/dsq-" + name )),
@@ -84,7 +84,7 @@ func (q *DSQueue) Close() error {
8484 select {
8585 case <- q .closed :
8686 case <- timeoutCh :
87- q .close () // force immediate shutdown
87+ q .cancel () // force immediate shutdown
8888 err = <- q .closed
8989 }
9090 close (q .dequeue ) // no more output from this queue
@@ -150,21 +150,30 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
150150
151151 defer func () {
152152 if item != "" {
153+ // Write the item directly, instead of pushing it to the front of
154+ // inbuf, in order to retain it's original kay, and therefore the
155+ // order in the datastore, which may not be empty.
153156 if err := q .ds .Put (ctx , k , nil ); err != nil {
154- log .Errorw ("failed to write item to datastore" , "err" , err , "qname" , q .name )
157+ if ! errors .Is (err , context .Canceled ) {
158+ log .Errorw ("failed to write item to datastore" , "err" , err , "qname" , q .name )
159+ }
160+ q .closed <- fmt .Errorf ("%d items not written to datastore" , 1 + inBuf .Len ())
161+ return
155162 }
156163 }
157164 if inBuf .Len () != 0 {
158165 err := q .commitInput (ctx , counter , & inBuf )
159- if err != nil && ! errors .Is (err , context .Canceled ) {
160- log .Errorw ("error writing items to datastore" , "err" , err , "qname" , q .name )
166+ if err != nil {
167+ if ! errors .Is (err , context .Canceled ) {
168+ log .Errorw ("error writing items to datastore" , "err" , err , "qname" , q .name )
169+ }
161170 if inBuf .Len () != 0 {
162171 q .closed <- fmt .Errorf ("%d items not written to datastore" , inBuf .Len ())
163172 }
164173 }
165174 }
166175 if err := q .ds .Sync (ctx , datastore .NewKey ("" )); err != nil {
167- q . closed <- fmt . Errorf ( "cannot sync datastore: %w" , err )
176+ log . Errorw ( "failed to sync datastore" , "err" , err , "qname" , q . name )
168177 }
169178 }()
170179
@@ -378,6 +387,10 @@ func (q *DSQueue) getQueueHead(ctx context.Context) (*query.Entry, error) {
378387}
379388
380389func (q * DSQueue ) commitInput (ctx context.Context , counter uint64 , items * deque.Deque [string ]) error {
390+ if ctx .Err () != nil {
391+ return ctx .Err ()
392+ }
393+
381394 b , err := q .ds .Batch (ctx )
382395 if err != nil {
383396 return fmt .Errorf ("failed to create batch: %w" , err )
0 commit comments