Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type DSQueue struct {
enqueue chan string
clear chan chan<- int
closeTimeout time.Duration
getn chan getRequest
name string
}

Expand All @@ -61,6 +62,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
enqueue: make(chan string),
clear: make(chan chan<- int),
closeTimeout: cfg.closeTimeout,
getn: make(chan getRequest),
name: name,
}

Expand Down Expand Up @@ -107,6 +109,36 @@ func (q *DSQueue) Put(item []byte) (err error) {
return
}

type getRequest struct {
n int
rsp chan getResponse
}

type getResponse struct {
items [][]byte
err error
}

// GetN retrieves up to n items that are currently available in the queue. If
// there are no items currently available, then none are returned and GetN does
// not wait for any.
//
// GetN is used to poll the DSQueue for items and return batches of those
// items. This is the most efficient way of fetching currently available items.
//
// GetN and Out can both be used to read items from the DSQueue, but they
// should not be used concurrently as items will be returned by one or the
// other indeterminately.
func (q *DSQueue) GetN(n int) ([][]byte, error) {
rsp := make(chan getResponse)
q.getn <- getRequest{
n: n,
rsp: rsp,
}
getRsp := <-rsp
return getRsp.items, getRsp.err
}

// Out returns a channel that for reading entries from the queue,
func (q *DSQueue) Out() <-chan []byte {
return q.dequeue
Expand Down Expand Up @@ -264,9 +296,43 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
if bufferSize != 0 && inBuf.Len() >= bufferSize {
commit = true
}
case getRequest := <-q.getn:
n := getRequest.n
rspChan := getRequest.rsp
var outItems [][]byte

if item != "" {
outItems = append(outItems, []byte(item))

if !dsEmpty {
outItems, err = q.readDatastore(ctx, n-len(outItems), outItems)
if err != nil {
rspChan <- getResponse{
err: err,
}
continue
}
}

item = ""
idle = false
}
if len(outItems) < n {
for itm := range inBuf.IterPopFront() {
outItems = append(outItems, []byte(itm))
if len(outItems) == n {
break
}
}
}
rspChan <- getResponse{
items: outItems,
}

case dequeue <- []byte(item):
item = ""
idle = false

case <-batchTimer.C:
if idle {
if inBuf.Len() != 0 {
Expand Down Expand Up @@ -414,3 +480,67 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.

return nil
}

// readDatastore reads at most n items from the data store queue, in order, and
// appends them to items slice. Items are batch-deleted from the datastore as
// they are read. The modified items slice is returned.
func (q *DSQueue) readDatastore(ctx context.Context, n int, items [][]byte) ([][]byte, error) {
qry := query.Query{
KeysOnly: true,
Orders: []query.Order{query.OrderByKey{}},
Limit: n,
}
results, err := q.ds.Query(ctx, qry)
if err != nil {
return nil, err
}
defer results.Close()

batch, err := q.ds.Batch(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create datastore batch: %w", err)
}
var delCount int

for result := range results.Next() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if result.Error != nil {
return nil, result.Error
}

if err = batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil {
return nil, fmt.Errorf("error deleting queue item: %w", err)
}
delCount++

if delCount >= DefaultBufferSize {
delCount = 0
if err = batch.Commit(ctx); err != nil {
return nil, fmt.Errorf("cannot commit datastore updates: %w", err)
}
}

parts := strings.SplitN(strings.TrimPrefix(result.Key, "/"), "/", 2)
if len(parts) != 2 {
log.Errorw("malformed queued item, removing it from queue", "err", err, "key", result.Key, "qname", q.name)
continue
}
itemBin, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", result.Key, "qname", q.name)
continue
}
items = append(items, itemBin)
}

if err = batch.Commit(ctx); err != nil {
return nil, fmt.Errorf("cannot commit datastore updated: %w", err)
}
if err = q.ds.Sync(ctx, datastore.NewKey("")); err != nil {
return nil, fmt.Errorf("cannot sync datastore: %w", err)
}

return items, nil
}
86 changes: 58 additions & 28 deletions dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const dsqName = "testq"

func assertOrdered(cids []cid.Cid, q *dsqueue.DSQueue, t *testing.T) {
func assertOrdered(t *testing.T, cids []cid.Cid, q *dsqueue.DSQueue) {
t.Helper()

var count int
Expand Down Expand Up @@ -60,41 +60,33 @@ func TestBasicOperation(t *testing.T) {
case <-time.After(time.Millisecond):
}

items := []string{"apple", "banana", "cherry"}

out := make(chan []string)
go func() {
var outStrs []string
for {
select {
case dq, open := <-queue.Out():
if !open {
out <- outStrs
return
}
dqItem := string(dq)
t.Log("got:", dqItem)
outStrs = append(outStrs, dqItem)
for dq := range queue.Out() {
dqItem := string(dq)
outStrs = append(outStrs, dqItem)
if len(outStrs) == len(items) {
break
}
}
out <- outStrs
}()

items := []string{"apple", "banana", "cherry"}
for _, item := range items {
queue.Put([]byte(item))
}

time.Sleep(time.Second)
err := queue.Close()
if err != nil {
t.Fatal(err)
}

qout := <-out

if len(qout) != len(items) {
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(items), len(qout))
t.Fatalf("dequeued wrong number of items, expected %d, got %d", len(items), len(qout))
}

if err = queue.Close(); err != nil {
err := queue.Close()
if err != nil {
t.Fatal(err)
}

Expand All @@ -104,6 +96,45 @@ func TestBasicOperation(t *testing.T) {
}
}

func TestGetN(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))
defer queue.Close()

cids := random.Cids(29)
for _, c := range cids {
queue.Put(c.Bytes())
}

outItems, err := queue.GetN(50)
if err != nil {
t.Fatal(err)
}

if len(outItems) != len(cids) {
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(cids), len(outItems))
}

for i := range outItems {
outCid, err := cid.Parse(outItems[i])
if err != nil {
t.Fatal(err)
}
if outCid != cids[i] {
t.Fatal("retrieved items out of order")
}
}

outItems, err = queue.GetN(10)
if err != nil {
t.Fatal(err)
}

if len(outItems) != 0 {
t.Fatal("shoul not get anymore items from queue")
}
}

func TestMangledData(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())

Expand All @@ -125,8 +156,7 @@ func TestMangledData(t *testing.T) {
}

// expect to only see the valid cids we entered
expected := cids
assertOrdered(expected, queue, t)
assertOrdered(t, cids, queue)
}

func TestInitialization(t *testing.T) {
Expand All @@ -139,7 +169,7 @@ func TestInitialization(t *testing.T) {
queue.Put(c.Bytes())
}

assertOrdered(cids[:5], queue, t)
assertOrdered(t, cids[:5], queue)

err := queue.Close()
if err != nil {
Expand All @@ -150,7 +180,7 @@ func TestInitialization(t *testing.T) {
queue = dsqueue.New(ds, dsqName)
defer queue.Close()

assertOrdered(cids[5:], queue, t)
assertOrdered(t, cids[5:], queue)
}

func TestIdleFlush(t *testing.T) {
Expand Down Expand Up @@ -230,7 +260,7 @@ func TestPersistManyCids(t *testing.T) {
queue = dsqueue.New(ds, dsqName)
defer queue.Close()

assertOrdered(cids, queue, t)
assertOrdered(t, cids, queue)
}

func TestPersistOneCid(t *testing.T) {
Expand All @@ -250,7 +280,7 @@ func TestPersistOneCid(t *testing.T) {
queue = dsqueue.New(ds, dsqName)
defer queue.Close()

assertOrdered(cids, queue, t)
assertOrdered(t, cids, queue)
}

func TestDeduplicateCids(t *testing.T) {
Expand All @@ -268,7 +298,7 @@ func TestDeduplicateCids(t *testing.T) {
queue.Put(cids[0].Bytes())
queue.Put(cids[4].Bytes())

assertOrdered(cids, queue, t)
assertOrdered(t, cids, queue)

// Test with dedup cache disabled.
queue = dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(-1))
Expand All @@ -278,7 +308,7 @@ func TestDeduplicateCids(t *testing.T) {
for _, c := range cids {
queue.Put(c.Bytes())
}
assertOrdered(cids, queue, t)
assertOrdered(t, cids, queue)
}

func TestClear(t *testing.T) {
Expand Down