Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package api

import (
"context"
"sync"

"github.com/ipfs-shipyard/w3rc"
w3rcfetcher "github.com/ipfs-shipyard/w3rc/fetcher"
"github.com/ipfs-shipyard/w3rc/gateway"
"github.com/ipfs-shipyard/w3rc/store"
"github.com/ipfs/go-fetcher"
"github.com/ipfs/go-unixfsnode"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage"
)

// implementation of the gateway api using w3rc

type api struct {
opts []w3rc.Option
baseReader storage.ReadableStorage
baseWriter storage.WritableStorage

sessionLk sync.Mutex
sessions map[*ipld.LinkSystem]context.Context
}

func NewAPI(cacheSizeBytes uint64, opts ...w3rc.Option) gateway.API {
cacheBase := store.NewCachingStore(cacheSizeBytes)
a := api{
opts: opts,
baseReader: cacheBase,
baseWriter: cacheBase,
sessions: make(map[*ipld.LinkSystem]context.Context),
}
return &a
}

func (a *api) FetcherForSession(ls *ipld.LinkSystem) fetcher.Fetcher {
fc := w3rcfetcher.FetcherConfig{
LinkSystem: ls,
PrototypeChooser: w3rcfetcher.DefaultPrototypeChooser,
Options: a.opts,
}

a.sessionLk.Lock()
defer a.sessionLk.Unlock()
ctx, ok := a.sessions[ls]
if !ok {
// todo: warn
return fc.NewSession(context.TODO())
}
return fc.NewSession(ctx)
}

func (a *api) NewSession(ctx context.Context) *ipld.LinkSystem {
ls := cidlink.DefaultLinkSystem()
derivedReadStore, derivedWriteStore := store.NewWriteThroughStore(a.baseReader, a.baseWriter)
ls.KnownReifiers = make(map[string]linking.NodeReifier)
ls.KnownReifiers["unixfs"] = unixfsnode.Reify
ls.SetReadStorage(derivedReadStore)
ls.SetWriteStorage(derivedWriteStore)
a.sessionLk.Lock()
defer a.sessionLk.Unlock()
a.sessions[&ls] = ctx

go func() {
<-ctx.Done()
a.sessionLk.Lock()
defer a.sessionLk.Unlock()
delete(a.sessions, &ls)
}()

return &ls
}
24 changes: 24 additions & 0 deletions cmd/w3r/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ func main1() int {
&cli.StringFlag{
Name: "indexer",
Usage: "query a specific indexer endpoint",
Value: "https://cid.contact",
},
&cli.BoolFlag{
Name: "verbose",
Aliases: []string{"v"},
Usage: "verbose output",
},
},
},
{
Name: "serve",
Usage: "Serve CIDs",
Action: Serve,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "listen on this address",
Aliases: []string{"l"},
Value: ":8080",
},
&cli.StringFlag{
Name: "indexer",
Usage: "query a specific indexer endpoint",
Value: "https://cid.contact",
},
&cli.BoolFlag{
Name: "verbose",
Expand Down
67 changes: 67 additions & 0 deletions cmd/w3r/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"net"
"os"
"os/signal"
"syscall"
"time"

"github.com/ipfs-shipyard/w3rc"
"github.com/ipfs-shipyard/w3rc/api"
"github.com/ipfs-shipyard/w3rc/gateway"
"github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
)

// Serve content over HTTP
func Serve(c *cli.Context) error {
if c.Bool("verbose") {
log.SetLogLevel("*", "debug")
}
opts := []w3rc.Option{}
if c.IsSet("indexer") {
opts = append(opts, w3rc.WithIndexer(c.String("indexer")))
}

signalChan := make(chan os.Signal, 1)

signal.Notify(
signalChan,
syscall.SIGHUP, // kill -SIGHUP XXXX
syscall.SIGINT, // kill -SIGINT XXXX or Ctrl+c
syscall.SIGQUIT, // kill -SIGQUIT XXXX
)

w3rcAPI := api.NewAPI(10_000_000, opts...)

listener, err := net.Listen("tcp", c.String("listen"))
if err != nil {
return err
}
defer listener.Close()

gatewayConf := gateway.GatewayConfig{
NoDNSLink: true,
PublicGateways: map[string]*gateway.GatewaySpec{"ipfs.localhost": &gateway.GatewaySpec{
UseSubdomains: true,
NoDNSLink: true,
}},
}

server := gateway.Serve(w3rcAPI, &gatewayConf, listener,
gateway.HostnameOption(),
gateway.LogOption(),
gateway.MetricsCollectionOption("api"),
gateway.MetricsOpenCensusCollectionOption(),
gateway.VersionOption(),
gateway.WebUIOption,
gateway.GatewayOption("/ipfs"),
)

<-signalChan
cctx, cancel := context.WithTimeout(c.Context, 30*time.Second)
defer cancel()
return server.Shutdown(cctx)
}
18 changes: 15 additions & 3 deletions contentrouting/delegated/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package delegated

import (
"context"
"fmt"

"github.com/filecoin-project/index-provider/metadata"
finderhttpclient "github.com/filecoin-project/storetheindex/api/v0/finder/client/http"
"github.com/filecoin-project/storetheindex/api/v0/finder/model"
"github.com/ipfs-shipyard/w3rc/contentrouting"
cid "github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-varint"
)

// NewDelegatedHTTP makes a routing provider backed by an HTTP endpoint.
func NewDelegatedHTTP(url string) (contentrouting.Routing, error) {
fmt.Printf("url is: %s\n", url)
client, err := finderhttpclient.New(url)
if err != nil {
return nil, err
Expand All @@ -31,7 +35,6 @@ func (hr *HTTPRouter) FindProviders(ctx context.Context, c cid.Cid, _ ...content
ch := make(chan contentrouting.RoutingRecord, 1)
go func() {
defer close(ch)

parsedResp, err := hr.Client.Find(ctx, c.Hash())
if err != nil {
ch <- contentrouting.RecordError(c, err)
Expand Down Expand Up @@ -65,12 +68,21 @@ func (r *httpRecord) Request() cid.Cid {

// Protocol indicates that this record is an error
func (r *httpRecord) Protocol() multicodec.Code {
return r.Val.Metadata.ProtocolID
code, _, err := varint.FromUvarint(r.Val.Metadata)
if err == nil {
return multicodec.Code(code)
}
return 0
}

// Payload is the underlying error
func (r *httpRecord) Payload() interface{} {
return r.Val.Metadata.Data
md := metadata.Metadata{}
if err := md.UnmarshalBinary(r.Val.Metadata); err != nil {
return nil
}
fp := r.Protocol()
return md.Get(fp)
}

// Payload is the underlying error
Expand Down
17 changes: 9 additions & 8 deletions contentrouting/delegated/http_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package delegated_test

import (
"bytes"
"context"
"fmt"
"net"
"testing"

"github.com/filecoin-project/index-provider/metadata"
"github.com/ipfs-shipyard/w3rc/contentrouting"
"github.com/ipfs-shipyard/w3rc/contentrouting/delegated"
mockdelegatedrouter "github.com/ipfs-shipyard/w3rc/contentrouting/delegated/mock"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
p2ptestutil "github.com/libp2p/go-libp2p-testing/netutil"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
Expand Down Expand Up @@ -45,16 +45,16 @@ func TestHTTPFetch(t *testing.T) {
multiaddr.StringCast("/ip4/127.0.0.1/tcp/8080/tls"),
},
}
serv.Add(foundCid, addr, 1, []byte("hello data"))
serv.Add(foundCid, addr, metadata.Bitswap{})
rcrdChan := cr.FindProviders(context.Background(), foundCid)
rcrds := doDrain(rcrdChan)
if len(rcrds) != 1 {
t.Fatalf("expected 1 record, got %d", len(rcrds))
}
if rcrds[0].Protocol() != 1 {
t.Fatalf("expected protocol '1', got %d", rcrds[0].Protocol())
if rcrds[0].Protocol() != multicodec.TransportBitswap {
t.Fatalf("expected bitswap protocol , got %d", rcrds[0].Protocol())
}
if !bytes.Equal(rcrds[0].Payload().([]byte), []byte("hello data")) {
if rcrds[0].Payload() != nil {
t.Fatal("unexpected payload")
}

Expand All @@ -68,13 +68,14 @@ func TestHTTPFetch(t *testing.T) {
}

// An invalid record:
serv.Add(otherCid, addr, contentrouting.RoutingErrorProtocol, []byte("error"))
proto := metadata.GraphsyncFilecoinV1{PieceCID: cid.NewCidV1(0, []byte{0}), VerifiedDeal: true, FastRetrieval: true}
serv.Add(otherCid, addr, &proto)
rcrdChan = cr.FindProviders(context.Background(), otherCid)
rcrds = doDrain(rcrdChan)
if len(rcrds) != 1 {
t.Fatalf("expected 1 record, got %d", len(rcrds))
}
if rcrds[0].Protocol() != contentrouting.RoutingErrorProtocol {
if rcrds[0].Protocol() != multicodec.TransportGraphsyncFilecoinv1 {
t.Fatalf("expected error, got %d", rcrds[0].Protocol())
}
}
Expand Down
17 changes: 9 additions & 8 deletions contentrouting/delegated/mock/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"net/http"
"path"

v0 "github.com/filecoin-project/storetheindex/api/v0"
"github.com/filecoin-project/index-provider/metadata"
"github.com/filecoin-project/storetheindex/api/v0/finder/model"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -56,17 +55,19 @@ func New() *MockDelegatedProvider {
}

// Add adds a record for the provider to return.
func (m *MockDelegatedProvider) Add(c cid.Cid, peerAddr peer.AddrInfo, protocol uint64, metadata []byte) error {
func (m *MockDelegatedProvider) Add(c cid.Cid, peerAddr peer.AddrInfo, md metadata.Protocol) error {
mdb := metadata.New(md)
mdbb, err := mdb.MarshalBinary()
if err != nil {
return err
}
m.records[string(c.Hash())] = model.MultihashResult{
Multihash: c.Hash(),
ProviderResults: []model.ProviderResult{
{
ContextID: []byte("something"),
Metadata: v0.Metadata{
Data: metadata,
ProtocolID: multicodec.Code(protocol),
},
Provider: peerAddr,
Metadata: mdbb,
Provider: peerAddr,
},
},
}
Expand Down
Loading