1
1
package channels_test
2
2
3
3
import (
4
- "bytes"
5
4
"context"
6
5
"errors"
7
- "math/rand"
8
- "os"
9
6
"testing"
10
7
"time"
11
8
12
9
"github.com/ipfs/go-cid"
13
10
"github.com/ipfs/go-datastore"
14
11
dss "github.com/ipfs/go-datastore/sync"
15
- "github.com/ipld/go-ipld-prime/codec/dagcbor"
16
12
basicnode "github.com/ipld/go-ipld-prime/node/basic"
17
13
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
18
14
peer "github.com/libp2p/go-libp2p-core/peer"
19
15
"github.com/stretchr/testify/require"
20
- cbg "github.com/whyrusleeping/cbor-gen"
21
16
"golang.org/x/xerrors"
22
17
23
- versioning "github.com/filecoin-project/go-ds-versioning/pkg"
24
- versionedds "github.com/filecoin-project/go-ds-versioning/pkg/datastore"
25
-
26
18
datatransfer "github.com/filecoin-project/go-data-transfer"
27
19
"github.com/filecoin-project/go-data-transfer/channels"
28
- "github.com/filecoin-project/go-data-transfer/channels/internal"
29
- "github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
30
- v0 "github.com/filecoin-project/go-data-transfer/channels/internal/migrations/v0"
31
- v1 "github.com/filecoin-project/go-data-transfer/channels/internal/migrations/v1"
32
- "github.com/filecoin-project/go-data-transfer/cidlists"
33
20
"github.com/filecoin-project/go-data-transfer/encoding"
34
21
"github.com/filecoin-project/go-data-transfer/testutil"
35
22
)
@@ -52,10 +39,7 @@ func TestChannels(t *testing.T) {
52
39
selector := builder .NewSelectorSpecBuilder (basicnode .Prototype .Any ).Matcher ().Node ()
53
40
peers := testutil .GeneratePeers (4 )
54
41
55
- dir := os .TempDir ()
56
- cidLists , err := cidlists .NewCIDLists (dir )
57
- require .NoError (t , err )
58
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
42
+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
59
43
require .NoError (t , err )
60
44
61
45
err = channelList .Start (ctx )
@@ -140,10 +124,8 @@ func TestChannels(t *testing.T) {
140
124
141
125
t .Run ("datasent/queued when transfer is already finished" , func (t * testing.T ) {
142
126
ds := dss .MutexWrap (datastore .NewMapDatastore ())
143
- dir := os .TempDir ()
144
- cidLists , err := cidlists .NewCIDLists (dir )
145
- require .NoError (t , err )
146
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
127
+
128
+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
147
129
require .NoError (t , err )
148
130
err = channelList .Start (ctx )
149
131
require .NoError (t , err )
@@ -172,10 +154,8 @@ func TestChannels(t *testing.T) {
172
154
173
155
t .Run ("updating send/receive values" , func (t * testing.T ) {
174
156
ds := dss .MutexWrap (datastore .NewMapDatastore ())
175
- dir := os .TempDir ()
176
- cidLists , err := cidlists .NewCIDLists (dir )
177
- require .NoError (t , err )
178
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
157
+
158
+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
179
159
require .NoError (t , err )
180
160
err = channelList .Start (ctx )
181
161
require .NoError (t , err )
@@ -244,10 +224,8 @@ func TestChannels(t *testing.T) {
244
224
245
225
t .Run ("missing cids" , func (t * testing.T ) {
246
226
ds := dss .MutexWrap (datastore .NewMapDatastore ())
247
- dir := os .TempDir ()
248
- cidLists , err := cidlists .NewCIDLists (dir )
249
- require .NoError (t , err )
250
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
227
+
228
+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
251
229
require .NoError (t , err )
252
230
err = channelList .Start (ctx )
253
231
require .NoError (t , err )
@@ -390,10 +368,7 @@ func TestChannels(t *testing.T) {
390
368
notifier := func (evt datatransfer.Event , chst datatransfer.ChannelState ) {
391
369
received <- event {evt , chst }
392
370
}
393
- dir := os .TempDir ()
394
- cidLists , err := cidlists .NewCIDLists (dir )
395
- require .NoError (t , err )
396
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
371
+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
397
372
require .NoError (t , err )
398
373
err = channelList .Start (ctx )
399
374
require .NoError (t , err )
@@ -443,257 +418,6 @@ func TestIsChannelCleaningUp(t *testing.T) {
443
418
require .False (t , channels .IsChannelCleaningUp (datatransfer .Cancelled ))
444
419
}
445
420
446
- func TestMigrationsV0 (t * testing.T ) {
447
- ctx := context .Background ()
448
- ctx , cancel := context .WithTimeout (ctx , 2 * time .Second )
449
- defer cancel ()
450
-
451
- ds := dss .MutexWrap (datastore .NewMapDatastore ())
452
- received := make (chan event )
453
- notifier := func (evt datatransfer.Event , chst datatransfer.ChannelState ) {
454
- received <- event {evt , chst }
455
- }
456
- numChannels := 5
457
- transferIDs := make ([]datatransfer.TransferID , numChannels )
458
- initiators := make ([]peer.ID , numChannels )
459
- responders := make ([]peer.ID , numChannels )
460
- baseCids := make ([]cid.Cid , numChannels )
461
-
462
- totalSizes := make ([]uint64 , numChannels )
463
- sents := make ([]uint64 , numChannels )
464
- receiveds := make ([]uint64 , numChannels )
465
- messages := make ([]string , numChannels )
466
- vouchers := make ([]datatransfer.Voucher , numChannels )
467
- voucherResults := make ([]datatransfer.VoucherResult , numChannels )
468
-
469
- allSelector := builder .NewSelectorSpecBuilder (basicnode .Prototype .Any ).Matcher ().Node ()
470
- allSelectorBuf := new (bytes.Buffer )
471
- err := dagcbor .Encode (allSelector , allSelectorBuf )
472
- require .NoError (t , err )
473
- allSelectorBytes := allSelectorBuf .Bytes ()
474
-
475
- for i := 0 ; i < numChannels ; i ++ {
476
- transferIDs [i ] = datatransfer .TransferID (rand .Uint64 ())
477
- initiators [i ] = testutil .GeneratePeers (1 )[0 ]
478
- responders [i ] = testutil .GeneratePeers (1 )[0 ]
479
- baseCids [i ] = testutil .GenerateCids (1 )[0 ]
480
- totalSizes [i ] = rand .Uint64 ()
481
- sents [i ] = rand .Uint64 ()
482
- receiveds [i ] = rand .Uint64 ()
483
- messages [i ] = string (testutil .RandomBytes (20 ))
484
- vouchers [i ] = testutil .NewFakeDTType ()
485
- vBytes , err := encoding .Encode (vouchers [i ])
486
- require .NoError (t , err )
487
- voucherResults [i ] = testutil .NewFakeDTType ()
488
- vrBytes , err := encoding .Encode (voucherResults [i ])
489
- require .NoError (t , err )
490
- channel := v0.ChannelState {
491
- TransferID : transferIDs [i ],
492
- Initiator : initiators [i ],
493
- Responder : responders [i ],
494
- BaseCid : baseCids [i ],
495
- Selector : & cbg.Deferred {
496
- Raw : allSelectorBytes ,
497
- },
498
- Sender : initiators [i ],
499
- Recipient : responders [i ],
500
- TotalSize : totalSizes [i ],
501
- Status : datatransfer .Ongoing ,
502
- Sent : sents [i ],
503
- Received : receiveds [i ],
504
- Message : messages [i ],
505
- Vouchers : []v0.EncodedVoucher {
506
- {
507
- Type : vouchers [i ].Type (),
508
- Voucher : & cbg.Deferred {
509
- Raw : vBytes ,
510
- },
511
- },
512
- },
513
- VoucherResults : []v0.EncodedVoucherResult {
514
- {
515
- Type : voucherResults [i ].Type (),
516
- VoucherResult : & cbg.Deferred {
517
- Raw : vrBytes ,
518
- },
519
- },
520
- },
521
- }
522
- buf := new (bytes.Buffer )
523
- err = channel .MarshalCBOR (buf )
524
- require .NoError (t , err )
525
- err = ds .Put (ctx , datastore .NewKey (datatransfer.ChannelID {
526
- Initiator : initiators [i ],
527
- Responder : responders [i ],
528
- ID : transferIDs [i ],
529
- }.String ()), buf .Bytes ())
530
- require .NoError (t , err )
531
- }
532
-
533
- selfPeer := testutil .GeneratePeers (1 )[0 ]
534
- dir := os .TempDir ()
535
- cidLists , err := cidlists .NewCIDLists (dir )
536
- require .NoError (t , err )
537
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, selfPeer )
538
- require .NoError (t , err )
539
- err = channelList .Start (ctx )
540
- require .NoError (t , err )
541
-
542
- for i := 0 ; i < numChannels ; i ++ {
543
-
544
- channel , err := channelList .GetByID (ctx , datatransfer.ChannelID {
545
- Initiator : initiators [i ],
546
- Responder : responders [i ],
547
- ID : transferIDs [i ],
548
- })
549
- require .NoError (t , err )
550
- require .Equal (t , selfPeer , channel .SelfPeer ())
551
- require .Equal (t , transferIDs [i ], channel .TransferID ())
552
- require .Equal (t , baseCids [i ], channel .BaseCID ())
553
- require .Equal (t , allSelector , channel .Selector ())
554
- require .Equal (t , initiators [i ], channel .Sender ())
555
- require .Equal (t , responders [i ], channel .Recipient ())
556
- require .Equal (t , totalSizes [i ], channel .TotalSize ())
557
- require .Equal (t , datatransfer .Ongoing , channel .Status ())
558
- require .Equal (t , sents [i ], channel .Sent ())
559
- require .Equal (t , receiveds [i ], channel .Received ())
560
- require .Equal (t , messages [i ], channel .Message ())
561
- require .Equal (t , vouchers [i ], channel .LastVoucher ())
562
- require .Equal (t , voucherResults [i ], channel .LastVoucherResult ())
563
- }
564
- }
565
- func TestMigrationsV1 (t * testing.T ) {
566
- ctx := context .Background ()
567
- ctx , cancel := context .WithTimeout (ctx , 2 * time .Second )
568
- defer cancel ()
569
-
570
- ds := dss .MutexWrap (datastore .NewMapDatastore ())
571
- received := make (chan event )
572
- notifier := func (evt datatransfer.Event , chst datatransfer.ChannelState ) {
573
- received <- event {evt , chst }
574
- }
575
- numChannels := 5
576
- transferIDs := make ([]datatransfer.TransferID , numChannels )
577
- initiators := make ([]peer.ID , numChannels )
578
- responders := make ([]peer.ID , numChannels )
579
- baseCids := make ([]cid.Cid , numChannels )
580
-
581
- totalSizes := make ([]uint64 , numChannels )
582
- sents := make ([]uint64 , numChannels )
583
- receiveds := make ([]uint64 , numChannels )
584
- messages := make ([]string , numChannels )
585
- vouchers := make ([]datatransfer.Voucher , numChannels )
586
- voucherResults := make ([]datatransfer.VoucherResult , numChannels )
587
- receivedCids := make ([][]cid.Cid , numChannels )
588
- allSelector := builder .NewSelectorSpecBuilder (basicnode .Prototype .Any ).Matcher ().Node ()
589
- allSelectorBuf := new (bytes.Buffer )
590
- err := dagcbor .Encode (allSelector , allSelectorBuf )
591
- require .NoError (t , err )
592
- allSelectorBytes := allSelectorBuf .Bytes ()
593
- selfPeer := testutil .GeneratePeers (1 )[0 ]
594
- dir := os .TempDir ()
595
- cidLists , err := cidlists .NewCIDLists (dir )
596
- require .NoError (t , err )
597
-
598
- list , err := migrations .GetChannelStateMigrations (selfPeer , cidLists )
599
- require .NoError (t , err )
600
- vds , up := versionedds .NewVersionedDatastore (ds , list , versioning .VersionKey ("1" ))
601
- require .NoError (t , up (ctx ))
602
-
603
- for i := 0 ; i < numChannels ; i ++ {
604
- transferIDs [i ] = datatransfer .TransferID (rand .Uint64 ())
605
- initiators [i ] = testutil .GeneratePeers (1 )[0 ]
606
- responders [i ] = testutil .GeneratePeers (1 )[0 ]
607
- baseCids [i ] = testutil .GenerateCids (1 )[0 ]
608
- totalSizes [i ] = rand .Uint64 ()
609
- sents [i ] = rand .Uint64 ()
610
- receiveds [i ] = rand .Uint64 ()
611
- messages [i ] = string (testutil .RandomBytes (20 ))
612
- vouchers [i ] = testutil .NewFakeDTType ()
613
- vBytes , err := encoding .Encode (vouchers [i ])
614
- require .NoError (t , err )
615
- voucherResults [i ] = testutil .NewFakeDTType ()
616
- vrBytes , err := encoding .Encode (voucherResults [i ])
617
- require .NoError (t , err )
618
- receivedCids [i ] = testutil .GenerateCids (100 )
619
- channel := v1.ChannelState {
620
- TransferID : transferIDs [i ],
621
- Initiator : initiators [i ],
622
- Responder : responders [i ],
623
- BaseCid : baseCids [i ],
624
- Selector : & cbg.Deferred {
625
- Raw : allSelectorBytes ,
626
- },
627
- Sender : initiators [i ],
628
- Recipient : responders [i ],
629
- TotalSize : totalSizes [i ],
630
- Status : datatransfer .Ongoing ,
631
- Sent : sents [i ],
632
- Received : receiveds [i ],
633
- Message : messages [i ],
634
- Vouchers : []internal.EncodedVoucher {
635
- {
636
- Type : vouchers [i ].Type (),
637
- Voucher : & cbg.Deferred {
638
- Raw : vBytes ,
639
- },
640
- },
641
- },
642
- VoucherResults : []internal.EncodedVoucherResult {
643
- {
644
- Type : voucherResults [i ].Type (),
645
- VoucherResult : & cbg.Deferred {
646
- Raw : vrBytes ,
647
- },
648
- },
649
- },
650
- SelfPeer : selfPeer ,
651
- ReceivedCids : receivedCids [i ],
652
- }
653
- buf := new (bytes.Buffer )
654
- err = channel .MarshalCBOR (buf )
655
- require .NoError (t , err )
656
- err = vds .Put (ctx , datastore .NewKey (datatransfer.ChannelID {
657
- Initiator : initiators [i ],
658
- Responder : responders [i ],
659
- ID : transferIDs [i ],
660
- }.String ()), buf .Bytes ())
661
- require .NoError (t , err )
662
- }
663
-
664
- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, selfPeer )
665
- require .NoError (t , err )
666
- err = channelList .Start (ctx )
667
- require .NoError (t , err )
668
-
669
- for i := 0 ; i < numChannels ; i ++ {
670
-
671
- channel , err := channelList .GetByID (ctx , datatransfer.ChannelID {
672
- Initiator : initiators [i ],
673
- Responder : responders [i ],
674
- ID : transferIDs [i ],
675
- })
676
- require .NoError (t , err )
677
- require .Equal (t , selfPeer , channel .SelfPeer ())
678
- require .Equal (t , transferIDs [i ], channel .TransferID ())
679
- require .Equal (t , baseCids [i ], channel .BaseCID ())
680
- require .Equal (t , allSelector , channel .Selector ())
681
- require .Equal (t , initiators [i ], channel .Sender ())
682
- require .Equal (t , responders [i ], channel .Recipient ())
683
- require .Equal (t , totalSizes [i ], channel .TotalSize ())
684
- require .Equal (t , datatransfer .Ongoing , channel .Status ())
685
- require .Equal (t , sents [i ], channel .Sent ())
686
- require .Equal (t , receiveds [i ], channel .Received ())
687
- require .Equal (t , messages [i ], channel .Message ())
688
- require .Equal (t , vouchers [i ], channel .LastVoucher ())
689
- require .Equal (t , voucherResults [i ], channel .LastVoucherResult ())
690
- // No longer relying on this migration to migrate CID lists as they
691
- // have been deprecated since we moved to CID sets:
692
- // https://github.com/filecoin-project/go-data-transfer/pull/217
693
- //require.Equal(t, receivedCids[i], channel.ReceivedCids())
694
- }
695
- }
696
-
697
421
type event struct {
698
422
event datatransfer.Event
699
423
state datatransfer.ChannelState
0 commit comments