@@ -24,7 +24,7 @@ func assertOrdered(cids []cid.Cid, q *dsqueue.DSQueue, t *testing.T) {
2424 var count int
2525 for i , c := range cids {
2626 select {
27- case dequeued , ok := <- q .Dequeue ():
27+ case dequeued , ok := <- q .Out ():
2828 if ! ok {
2929 t .Fatal ("queue closed" )
3030 }
@@ -53,29 +53,52 @@ func TestBasicOperation(t *testing.T) {
5353 t .Fatal ("wrong queue name" )
5454 }
5555
56- queue .Enqueue (nil )
56+ queue .Put (nil )
5757 select {
58- case <- queue .Dequeue ():
58+ case <- queue .Out ():
5959 t .Fatal ("nothing should be in queue" )
6060 case <- time .After (time .Millisecond ):
6161 }
6262
63- cids := random .Cids (10 )
64- for _ , c := range cids {
65- queue .Enqueue (c .Bytes ())
66- }
63+ out := make (chan []string )
64+ go func () {
65+ var outStrs []string
66+ for {
67+ select {
68+ case dq , open := <- queue .Out ():
69+ if ! open {
70+ out <- outStrs
71+ return
72+ }
73+ dqItem := string (dq )
74+ t .Log ("got:" , dqItem )
75+ outStrs = append (outStrs , dqItem )
76+ }
77+ }
78+ }()
6779
68- assertOrdered (cids , queue , t )
80+ items := []string {"apple" , "banana" , "cherry" }
81+ for _ , item := range items {
82+ queue .Put ([]byte (item ))
83+ }
6984
85+ time .Sleep (time .Second )
7086 err := queue .Close ()
7187 if err != nil {
7288 t .Fatal (err )
7389 }
90+
91+ qout := <- out
92+
93+ if len (qout ) != len (items ) {
94+ t .Fatalf ("dequeued wrond number of items, expected %d, got %d" , len (items ), len (qout ))
95+ }
96+
7497 if err = queue .Close (); err != nil {
7598 t .Fatal (err )
7699 }
77100
78- err = queue .Enqueue ( cids [ 0 ]. Bytes ( ))
101+ err = queue .Put ([] byte ( items [ 0 ] ))
79102 if err == nil {
80103 t .Fatal ("expected error calling Enqueue after Close" )
81104 }
@@ -98,7 +121,7 @@ func TestMangledData(t *testing.T) {
98121
99122 cids := random .Cids (10 )
100123 for _ , c := range cids {
101- queue .Enqueue (c .Bytes ())
124+ queue .Put (c .Bytes ())
102125 }
103126
104127 // expect to only see the valid cids we entered
@@ -113,7 +136,7 @@ func TestInitialization(t *testing.T) {
113136
114137 cids := random .Cids (10 )
115138 for _ , c := range cids {
116- queue .Enqueue (c .Bytes ())
139+ queue .Put (c .Bytes ())
117140 }
118141
119142 assertOrdered (cids [:5 ], queue , t )
@@ -137,7 +160,7 @@ func TestIdleFlush(t *testing.T) {
137160
138161 cids := random .Cids (10 )
139162 for _ , c := range cids {
140- queue .Enqueue (c .Bytes ())
163+ queue .Put (c .Bytes ())
141164 }
142165
143166 dsn := namespace .Wrap (ds , datastore .NewKey ("/dsq-" + dsqName ))
@@ -195,7 +218,7 @@ func TestPersistManyCids(t *testing.T) {
195218
196219 cids := random .Cids (25 )
197220 for _ , c := range cids {
198- queue .Enqueue (c .Bytes ())
221+ queue .Put (c .Bytes ())
199222 }
200223
201224 err := queue .Close ()
@@ -216,7 +239,7 @@ func TestPersistOneCid(t *testing.T) {
216239 defer queue .Close ()
217240
218241 cids := random .Cids (1 )
219- queue .Enqueue (cids [0 ].Bytes ())
242+ queue .Put (cids [0 ].Bytes ())
220243
221244 err := queue .Close ()
222245 if err != nil {
@@ -236,14 +259,14 @@ func TestDeduplicateCids(t *testing.T) {
236259 defer queue .Close ()
237260
238261 cids := random .Cids (5 )
239- queue .Enqueue (cids [0 ].Bytes ())
240- queue .Enqueue (cids [0 ].Bytes ())
241- queue .Enqueue (cids [1 ].Bytes ())
242- queue .Enqueue (cids [2 ].Bytes ())
243- queue .Enqueue (cids [1 ].Bytes ())
244- queue .Enqueue (cids [3 ].Bytes ())
245- queue .Enqueue (cids [0 ].Bytes ())
246- queue .Enqueue (cids [4 ].Bytes ())
262+ queue .Put (cids [0 ].Bytes ())
263+ queue .Put (cids [0 ].Bytes ())
264+ queue .Put (cids [1 ].Bytes ())
265+ queue .Put (cids [2 ].Bytes ())
266+ queue .Put (cids [1 ].Bytes ())
267+ queue .Put (cids [3 ].Bytes ())
268+ queue .Put (cids [0 ].Bytes ())
269+ queue .Put (cids [4 ].Bytes ())
247270
248271 assertOrdered (cids , queue , t )
249272
@@ -253,7 +276,7 @@ func TestDeduplicateCids(t *testing.T) {
253276
254277 cids = append (cids , cids [0 ], cids [0 ], cids [1 ])
255278 for _ , c := range cids {
256- queue .Enqueue (c .Bytes ())
279+ queue .Put (c .Bytes ())
257280 }
258281 assertOrdered (cids , queue , t )
259282}
@@ -266,7 +289,7 @@ func TestClear(t *testing.T) {
266289 defer queue .Close ()
267290
268291 for _ , c := range random .Cids (cidCount ) {
269- queue .Enqueue (c .Bytes ())
292+ queue .Put (c .Bytes ())
270293 }
271294
272295 // Cause queued entried to be saved in datastore.
@@ -279,7 +302,7 @@ func TestClear(t *testing.T) {
279302 defer queue .Close ()
280303
281304 for _ , c := range random .Cids (cidCount ) {
282- queue .Enqueue (c .Bytes ())
305+ queue .Put (c .Bytes ())
283306 }
284307
285308 rmCount := queue .Clear ()
@@ -297,7 +320,7 @@ func TestClear(t *testing.T) {
297320 defer queue .Close ()
298321
299322 select {
300- case <- queue .Dequeue ():
323+ case <- queue .Out ():
301324 t .Fatal ("dequeue should not return" )
302325 case <- time .After (10 * time .Millisecond ):
303326 }
@@ -314,7 +337,7 @@ func TestCloseTimeout(t *testing.T) {
314337
315338 cids := random .Cids (5 )
316339 for _ , c := range cids {
317- queue .Enqueue (c .Bytes ())
340+ queue .Put (c .Bytes ())
318341 }
319342
320343 err := queue .Close ()
@@ -331,7 +354,7 @@ func TestCloseTimeout(t *testing.T) {
331354 defer queue .Close ()
332355
333356 for _ , c := range cids {
334- queue .Enqueue (c .Bytes ())
357+ queue .Put (c .Bytes ())
335358 }
336359 if err = queue .Close (); err != nil {
337360 t .Fatal (err )
0 commit comments