Skip to content

Commit a12f639

Browse files
authored
Removing CID Lists Part 2: Deprecating protocols (#293)
* refactor(cidsets): remove cidsets * refactor(network): remove deprecated protocols * refactor(cidlists): delete cidlists
1 parent eddbf61 commit a12f639

38 files changed

+79
-3688
lines changed

benchmarks/testinstance/testinstance.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package testinstance
22

33
import (
44
"context"
5-
"os"
65
"time"
76

87
"github.com/ipfs/go-datastore"
@@ -165,8 +164,8 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
165164

166165
linkSystem := storeutil.LinkSystemForBlockstore(bstore)
167166
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
168-
transport := gstransport.NewTransport(p, gs, dtNet)
169-
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
167+
transport := gstransport.NewTransport(p, gs)
168+
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), dtNet, transport)
170169
if err != nil {
171170
return Instance{}, err
172171
}

channelmonitor/channelmonitor_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -610,14 +610,6 @@ func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
610610
panic("implement me")
611611
}
612612

613-
func (m *mockChannelState) ReceivedCids() []cid.Cid {
614-
panic("implement me")
615-
}
616-
617-
func (m *mockChannelState) ReceivedCidsLen() int {
618-
panic("implement me")
619-
}
620-
621613
func (m *mockChannelState) ReceivedCidsTotal() int64 {
622614
panic("implement me")
623615
}

channels/channel_state.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ type channelState struct {
5757
voucherResults []internal.EncodedVoucherResult
5858
voucherResultDecoder DecoderByTypeFunc
5959
voucherDecoder DecoderByTypeFunc
60-
receivedCids ReceivedCidsReader
6160

6261
// stages tracks the timeline of events related to a data transfer, for
6362
// traceability purposes.
@@ -110,24 +109,6 @@ func (c channelState) Voucher() datatransfer.Voucher {
110109
return encodable.(datatransfer.Voucher)
111110
}
112111

113-
// ReceivedCids returns the cids received so far on this channel
114-
func (c channelState) ReceivedCids() []cid.Cid {
115-
receivedCids, err := c.receivedCids.ToArray(c.ChannelID())
116-
if err != nil {
117-
log.Error(err)
118-
}
119-
return receivedCids
120-
}
121-
122-
// ReceivedCids returns the number of unique cids received so far on this channel
123-
func (c channelState) ReceivedCidsLen() int {
124-
len, err := c.receivedCids.Len(c.ChannelID())
125-
if err != nil {
126-
log.Error(err)
127-
}
128-
return len
129-
}
130-
131112
// ReceivedCidsTotal returns the number of (non-unique) cids received so far
132113
// on the channel - note that a block can exist in more than one place in the DAG
133114
func (c channelState) ReceivedCidsTotal() int64 {
@@ -233,7 +214,7 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
233214
return c.stages
234215
}
235216

236-
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, receivedCidsReader ReceivedCidsReader) datatransfer.ChannelState {
217+
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
237218
return channelState{
238219
selfPeer: c.SelfPeer,
239220
isPull: c.Initiator == c.Recipient,
@@ -255,7 +236,6 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
255236
voucherResults: c.VoucherResults,
256237
voucherResultDecoder: voucherResultDecoder,
257238
voucherDecoder: voucherDecoder,
258-
receivedCids: receivedCidsReader,
259239
stages: c.Stages,
260240
missingCids: c.MissingCids,
261241
}

channels/channels.go

Lines changed: 2 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/ipfs/go-cid"
99
"github.com/ipfs/go-datastore"
10-
"github.com/ipfs/go-datastore/namespace"
1110
"github.com/ipld/go-ipld-prime"
1211
peer "github.com/libp2p/go-libp2p-core/peer"
1312
cbg "github.com/whyrusleeping/cbor-gen"
@@ -21,18 +20,11 @@ import (
2120
datatransfer "github.com/filecoin-project/go-data-transfer"
2221
"github.com/filecoin-project/go-data-transfer/channels/internal"
2322
"github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
24-
"github.com/filecoin-project/go-data-transfer/cidlists"
25-
"github.com/filecoin-project/go-data-transfer/cidsets"
2623
"github.com/filecoin-project/go-data-transfer/encoding"
2724
)
2825

2926
type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)
3027

31-
type ReceivedCidsReader interface {
32-
ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error)
33-
Len(chid datatransfer.ChannelID) (int, error)
34-
}
35-
3628
type Notifier func(datatransfer.Event, datatransfer.ChannelState)
3729

3830
// ErrNotFound is returned when a channel cannot be found with a given channel ID
@@ -59,7 +51,6 @@ type Channels struct {
5951
blockIndexCache *blockIndexCache
6052
stateMachines fsm.Group
6153
migrateStateMachines func(context.Context) error
62-
seenCIDs *cidsets.CIDSetManager
6354
}
6455

6556
// ChannelEnvironment -- just a proxy for DTNetwork for now
@@ -72,22 +63,19 @@ type ChannelEnvironment interface {
7263

7364
// New returns a new thread safe list of channels
7465
func New(ds datastore.Batching,
75-
cidLists cidlists.CIDLists,
7666
notifier Notifier,
7767
voucherDecoder DecoderByTypeFunc,
7868
voucherResultDecoder DecoderByTypeFunc,
7969
env ChannelEnvironment,
8070
selfPeer peer.ID) (*Channels, error) {
8171

82-
seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
8372
c := &Channels{
84-
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
8573
notifier: notifier,
8674
voucherDecoder: voucherDecoder,
8775
voucherResultDecoder: voucherResultDecoder,
8876
}
8977
c.blockIndexCache = newBlockIndexCache()
90-
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer, cidLists)
78+
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
9179
if err != nil {
9280
return nil, err
9381
}
@@ -127,19 +115,6 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
127115
}
128116
log.Debugw("process data transfer listeners", "name", datatransfer.Events[evtCode], "transfer ID", realChannel.TransferID)
129117
c.notifier(evt, c.fromInternalChannelState(realChannel))
130-
131-
// When the channel has been cleaned up, remove the caches of seen cids
132-
if evt.Code == datatransfer.CleanupComplete {
133-
chid := datatransfer.ChannelID{
134-
Initiator: realChannel.Initiator,
135-
Responder: realChannel.Responder,
136-
ID: realChannel.TransferID,
137-
}
138-
err := c.removeSeenCIDCaches(chid)
139-
if err != nil {
140-
log.Errorf("failed to clean up channel %s: %s", err)
141-
}
142-
}
143118
}
144119

145120
// CreateNew creates a new channel id and channel state and saves to channels.
@@ -271,12 +246,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
271246
// Returns true if this is the first time the block has been received
272247
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error) {
273248
new, err := c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta, index, unique, c.getReceivedIndex)
274-
// TODO: remove when ReceivedCids and legacy protocol is removed
275-
// write the seen received cids, but write async in order to avoid blocking processing
276-
if err == nil {
277-
sid := seenCidsSetID(chid, datatransfer.DataReceived)
278-
_, _ = c.seenCIDs.InsertSetCID(sid, k)
279-
}
280249
return new, err
281250
}
282251

@@ -395,25 +364,6 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
395364
return c.stateMachines.Has(chid)
396365
}
397366

398-
// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
399-
// blocks that have already been queued / sent / received
400-
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
401-
// Clean up seen block caches
402-
progressStates := []datatransfer.EventCode{
403-
datatransfer.DataQueued,
404-
datatransfer.DataSent,
405-
datatransfer.DataReceived,
406-
}
407-
for _, evt := range progressStates {
408-
sid := seenCidsSetID(chid, evt)
409-
err := c.seenCIDs.DeleteSet(sid)
410-
if err != nil {
411-
return err
412-
}
413-
}
414-
return nil
415-
}
416-
417367
// fireProgressEvent fires
418368
// - an event for queuing / sending / receiving blocks
419369
// - a corresponding "progress" event if the block has not been seen before
@@ -463,37 +413,7 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
463413
return nil
464414
}
465415

466-
// Get the ID of the CID set for the given channel ID and event code.
467-
// The CID set stores a unique list of queued / sent / received CIDs.
468-
func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cidsets.SetID {
469-
return cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
470-
}
471-
472416
// Convert from the internally used channel state format to the externally exposed ChannelState
473417
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
474-
rcr := &receivedCidsReader{seenCIDs: c.seenCIDs}
475-
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
476-
}
477-
478-
// Implements the ReceivedCidsReader interface so that the internal channel
479-
// state has access to the received CIDs.
480-
// The interface is used (instead of passing these values directly)
481-
// so the values can be loaded lazily. Reading all CIDs from the datastore
482-
// is an expensive operation so we want to avoid doing it unless necessary.
483-
// Note that the received CIDs get cleaned up when the channel completes, so
484-
// these methods will return an empty array after that point.
485-
type receivedCidsReader struct {
486-
seenCIDs *cidsets.CIDSetManager
418+
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder)
487419
}
488-
489-
func (r *receivedCidsReader) ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error) {
490-
sid := seenCidsSetID(chid, datatransfer.DataReceived)
491-
return r.seenCIDs.SetToArray(sid)
492-
}
493-
494-
func (r *receivedCidsReader) Len(chid datatransfer.ChannelID) (int, error) {
495-
sid := seenCidsSetID(chid, datatransfer.DataReceived)
496-
return r.seenCIDs.SetLen(sid)
497-
}
498-
499-
var _ ReceivedCidsReader = (*receivedCidsReader)(nil)

0 commit comments

Comments
 (0)