Skip to content

Removing CID Lists Part 2: Deprecating protocols #293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package testinstance

import (
"context"
"os"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -165,8 +164,8 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD

linkSystem := storeutil.LinkSystemForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs, dtNet)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
transport := gstransport.NewTransport(p, gs)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), dtNet, transport)
if err != nil {
return Instance{}, err
}
Expand Down
8 changes: 0 additions & 8 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,14 +610,6 @@ func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
panic("implement me")
}

func (m *mockChannelState) ReceivedCids() []cid.Cid {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsLen() int {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsTotal() int64 {
panic("implement me")
}
Expand Down
22 changes: 1 addition & 21 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type channelState struct {
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
receivedCids ReceivedCidsReader

// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
Expand Down Expand Up @@ -110,24 +109,6 @@ func (c channelState) Voucher() datatransfer.Voucher {
return encodable.(datatransfer.Voucher)
}

// ReceivedCids returns the cids received so far on this channel
func (c channelState) ReceivedCids() []cid.Cid {
receivedCids, err := c.receivedCids.ToArray(c.ChannelID())
if err != nil {
log.Error(err)
}
return receivedCids
}

// ReceivedCids returns the number of unique cids received so far on this channel
func (c channelState) ReceivedCidsLen() int {
len, err := c.receivedCids.Len(c.ChannelID())
if err != nil {
log.Error(err)
}
return len
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) ReceivedCidsTotal() int64 {
Expand Down Expand Up @@ -233,7 +214,7 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, receivedCidsReader ReceivedCidsReader) datatransfer.ChannelState {
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
Expand All @@ -255,7 +236,6 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
voucherResults: c.VoucherResults,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
receivedCids: receivedCidsReader,
stages: c.Stages,
missingCids: c.MissingCids,
}
Expand Down
84 changes: 2 additions & 82 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
Expand All @@ -21,18 +20,11 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels/internal"
"github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/cidlists"
"github.com/filecoin-project/go-data-transfer/cidsets"
"github.com/filecoin-project/go-data-transfer/encoding"
)

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type ReceivedCidsReader interface {
ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error)
Len(chid datatransfer.ChannelID) (int, error)
}

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

// ErrNotFound is returned when a channel cannot be found with a given channel ID
Expand All @@ -59,7 +51,6 @@ type Channels struct {
blockIndexCache *blockIndexCache
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
seenCIDs *cidsets.CIDSetManager
}

// ChannelEnvironment -- just a proxy for DTNetwork for now
Expand All @@ -72,22 +63,19 @@ type ChannelEnvironment interface {

// New returns a new thread safe list of channels
func New(ds datastore.Batching,
cidLists cidlists.CIDLists,
notifier Notifier,
voucherDecoder DecoderByTypeFunc,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {

seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
c := &Channels{
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
}
c.blockIndexCache = newBlockIndexCache()
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer, cidLists)
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -127,19 +115,6 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
}
log.Debugw("process data transfer listeners", "name", datatransfer.Events[evtCode], "transfer ID", realChannel.TransferID)
c.notifier(evt, c.fromInternalChannelState(realChannel))

// When the channel has been cleaned up, remove the caches of seen cids
if evt.Code == datatransfer.CleanupComplete {
chid := datatransfer.ChannelID{
Initiator: realChannel.Initiator,
Responder: realChannel.Responder,
ID: realChannel.TransferID,
}
err := c.removeSeenCIDCaches(chid)
if err != nil {
log.Errorf("failed to clean up channel %s: %s", err)
}
}
}

// CreateNew creates a new channel id and channel state and saves to channels.
Expand Down Expand Up @@ -271,12 +246,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error) {
new, err := c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta, index, unique, c.getReceivedIndex)
// TODO: remove when ReceivedCids and legacy protocol is removed
// write the seen received cids, but write async in order to avoid blocking processing
if err == nil {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
_, _ = c.seenCIDs.InsertSetCID(sid, k)
}
return new, err
}

Expand Down Expand Up @@ -395,25 +364,6 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
}

// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
// blocks that have already been queued / sent / received
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
// Clean up seen block caches
progressStates := []datatransfer.EventCode{
datatransfer.DataQueued,
datatransfer.DataSent,
datatransfer.DataReceived,
}
for _, evt := range progressStates {
sid := seenCidsSetID(chid, evt)
err := c.seenCIDs.DeleteSet(sid)
if err != nil {
return err
}
}
return nil
}

// fireProgressEvent fires
// - an event for queuing / sending / receiving blocks
// - a corresponding "progress" event if the block has not been seen before
Expand Down Expand Up @@ -463,37 +413,7 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
return nil
}

// Get the ID of the CID set for the given channel ID and event code.
// The CID set stores a unique list of queued / sent / received CIDs.
func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cidsets.SetID {
return cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
}

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
rcr := &receivedCidsReader{seenCIDs: c.seenCIDs}
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
}

// Implements the ReceivedCidsReader interface so that the internal channel
// state has access to the received CIDs.
// The interface is used (instead of passing these values directly)
// so the values can be loaded lazily. Reading all CIDs from the datastore
// is an expensive operation so we want to avoid doing it unless necessary.
// Note that the received CIDs get cleaned up when the channel completes, so
// these methods will return an empty array after that point.
type receivedCidsReader struct {
seenCIDs *cidsets.CIDSetManager
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder)
}

func (r *receivedCidsReader) ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetToArray(sid)
}

func (r *receivedCidsReader) Len(chid datatransfer.ChannelID) (int, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetLen(sid)
}

var _ ReceivedCidsReader = (*receivedCidsReader)(nil)
Loading