Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/ww/cat/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var env util.IPFSEnv
func Command() *cli.Command {
return &cli.Command{
Name: "cat",
ArgsUsage: "<peer> <proc>",
ArgsUsage: "<peer> <proc> [method]",
Usage: "Connect to a peer and execute a procedure over a stream",
Description: `Connect to a specified peer and execute a procedure over a custom protocol stream.
The command will:
Expand All @@ -33,7 +33,8 @@ The command will:

Examples:
ww cat QmPeer123 /echo
ww cat 12D3KooW... /myproc`,
ww cat 12D3KooW... /myproc echo
ww cat 12D3KooW... /myproc poll`,
Flags: append([]cli.Flag{
&cli.StringFlag{
Name: "ipfs",
Expand All @@ -58,12 +59,13 @@ func Main(c *cli.Context) error {
ctx, cancel := context.WithTimeout(c.Context, c.Duration("timeout"))
defer cancel()

if c.NArg() != 2 {
return cli.Exit("cat requires exactly two arguments: <peer> <proc>", 1)
if c.NArg() < 3 {
return cli.Exit("cat requires 2-3 arguments: <peer> <proc> [method]", 1)
}

peerIDStr := c.Args().Get(0)
procName := c.Args().Get(1)
method := c.Args().Get(2)

// Parse peer ID
peerID, err := peer.Decode(peerIDStr)
Expand All @@ -73,6 +75,9 @@ func Main(c *cli.Context) error {

// Construct protocol ID
protocolID := protocol.ID("/ww/0.1.0/" + procName)
if method != "" && method != "poll" {
protocolID = protocol.ID("/ww/0.1.0/" + procName + "/" + method)
}

// Create libp2p host in client mode
h, err := util.NewClient()
Expand Down
29 changes: 25 additions & 4 deletions cmd/ww/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/ipfs/boxo/path"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -134,20 +136,39 @@ func Main(c *cli.Context) error {
"peer", env.Host.ID(),
"endpoint", p.Endpoint.Name)

env.Host.SetStreamHandler(p.Endpoint.Protocol(), func(s network.Stream) {
// Set up stream handler that matches both exact protocol and with method suffix
baseProto := p.Endpoint.Protocol()
env.Host.SetStreamHandlerMatch(baseProto, func(protocol protocol.ID) bool {
// Match exact base protocol (/ww/0.1.0/<proc-id>) or with method suffix (/ww/0.1.0/<proc-id>/<method>)
return protocol == baseProto || strings.HasPrefix(string(protocol), string(baseProto)+"/")
}, func(s network.Stream) {
defer s.CloseRead()

// Extract method from protocol string
method := "poll" // default
protocolStr := string(s.Protocol())
if strings.HasPrefix(protocolStr, string(baseProto)+"/") {
// Extract method from /ww/0.1.0/<proc-id>/<method>
parts := strings.Split(protocolStr, "/")
if len(parts) > 0 {
method = parts[len(parts)-1]
}
}

slog.InfoContext(ctx, "stream connected",
"peer", s.Conn().RemotePeer(),
"stream-id", s.ID(),
"endpoint", p.Endpoint.Name)
if err := p.Poll(ctx, s, nil); err != nil {
"endpoint", p.Endpoint.Name,
"method", method)
if err := p.ProcessMessage(ctx, s, method); err != nil {
slog.ErrorContext(ctx, "failed to poll process",
"id", p.ID(),
"stream", s.ID(),
"method", method,
"reason", err)
}
})
defer env.Host.RemoveStreamHandler(p.Endpoint.Protocol())
defer env.Host.RemoveStreamHandler(baseProto)

for {
select {
Expand Down
17 changes: 5 additions & 12 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,19 @@ import (
// main is the entry point for synchronous mode.
// It processes one complete message from stdin and exits.
func main() {
// Echo: copy stdin to stdout using io.Copy
// io.Copy uses an internal 32KB buffer by default
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
os.Stderr.WriteString("Error copying stdin to stdout: " + err.Error() + "\n")
os.Exit(1)
}
defer os.Stdout.Sync()
// implicitly returns 0 to indicate successful completion
echo()
}

// poll is the async entry point for stream-based processing.
// echo is the async entry point for stream-based processing.
// This function is called by the wetware runtime when a new stream
// is established for this process.
//
//export poll
func poll() {
//export echo
func echo() {
// In async mode, we process each incoming stream
// This is the same logic as main() but for individual streams
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
os.Stderr.WriteString("Error in poll: " + err.Error() + "\n")
os.Stderr.WriteString("Error in echo: " + err.Error() + "\n")
os.Exit(1)
}
defer os.Stdout.Sync()
Expand Down
Binary file modified examples/echo/main.wasm
Binary file not shown.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/lthibault/go-libp2p-inproc-transport v0.4.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.16.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.11.1
github.com/tetratelabs/wazero v1.9.0
github.com/urfave/cli/v2 v2.27.5
Expand Down
252 changes: 252 additions & 0 deletions system/ipfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package system

import (
"context"
"io"
"io/fs"
"log/slog"
"runtime"
"time"

"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/pkg/errors"
)

var _ fs.FS = (*IPFS)(nil)

// An IPFS provides access to a hierarchical file system.
//
// The IPFS interface is the minimum implementation required of the file system.
// A file system may implement additional interfaces,
// such as [ReadFileFS], to provide additional or optimized functionality.
//
// [testing/fstest.TestFS] may be used to test implementations of an IPFS for
// correctness.
type IPFS struct {
Ctx context.Context
Root path.Path
Unix iface.UnixfsAPI
}

// Open opens the named file.
//
// When Open returns an error, it should be of type *PathError
// with the Op field set to "open", the Path field set to name,
// and the Err field describing the problem.
//
// Open should reject attempts to open names that do not satisfy
// fs.ValidPath(name), returning a *fs.PathError with Err set to
// fs.ErrInvalid or fs.ErrNotExist.
func (f IPFS) Open(name string) (fs.File, error) {
path, node, err := f.Resolve(f.Ctx, name)
if err != nil {
return nil, &fs.PathError{
Op: "open",
Path: name,
Err: err,
}
}

return &ipfsNode{
Path: path,
Node: node,
}, nil
}

func (f IPFS) Resolve(ctx context.Context, name string) (path.Path, files.Node, error) {
if pathInvalid(name) {
return nil, nil, fs.ErrInvalid
}

p, err := path.Join(f.Root, name)
if err != nil {
return nil, nil, err
}

node, err := f.Unix.Get(ctx, p)
return p, node, err
}

func pathInvalid(name string) bool {
return !fs.ValidPath(name)
}

func (f IPFS) Sub(dir string) (fs.FS, error) {
var root path.Path
var err error
if (f == IPFS{}) {
root, err = path.NewPath(dir)
} else {
root, err = path.Join(f.Root, dir)
}

return &IPFS{
Ctx: f.Ctx,
Root: root,
Unix: f.Unix,
}, err
}

var (
_ fs.FileInfo = (*ipfsNode)(nil)
_ fs.ReadDirFile = (*ipfsNode)(nil)
_ fs.DirEntry = (*ipfsNode)(nil)
)

// ipfsNode provides access to a single file. The fs.File interface is the minimum
// implementation required of the file. Directory files should also implement [ReadDirFile].
// A file may implement io.ReaderAt or io.Seeker as optimizations.
type ipfsNode struct {
Path path.Path
files.Node
}

// base name of the file
func (n ipfsNode) Name() string {
segs := n.Path.Segments()
return segs[len(segs)-1] // last segment is name
}

func (n *ipfsNode) Stat() (fs.FileInfo, error) {
return n, nil
}

// length in bytes for regular files; system-dependent for others
func (n ipfsNode) Size() int64 {
size, err := n.Node.Size()
if err != nil {
slog.Error("failed to obtain file size",
"path", n.Path,
"reason", err)
}

return size
}

// file mode bits
func (n ipfsNode) Mode() fs.FileMode {
switch n.Node.(type) {
case files.Directory:
return fs.ModeDir
default:
return 0 // regular read-only file
}
}

// modification time
func (n ipfsNode) ModTime() time.Time {
return time.Time{} // zero-value time
}

// abbreviation for Mode().IsDir()
func (n ipfsNode) IsDir() bool {
return n.Mode().IsDir()
}

// underlying data source (never returns nil)
func (n ipfsNode) Sys() any {
return n.Node
}

func (n ipfsNode) Read(b []byte) (int, error) {
switch node := n.Node.(type) {
case io.Reader:
return node.Read(b)
default:
return 0, errors.New("unreadable node")
}
}

// ReadDir reads the contents of the directory and returns
// a slice of up to max DirEntry values in directory order.
// Subsequent calls on the same file will yield further DirEntry values.
//
// If max > 0, ReadDir returns at most max DirEntry structures.
// In this case, if ReadDir returns an empty slice, it will return
// a non-nil error explaining why.
// At the end of a directory, the error is io.EOF.
// (ReadDir must return io.EOF itself, not an error wrapping io.EOF.)
//
// If max <= 0, ReadDir returns all the DirEntry values from the directory
// in a single slice. In this case, if ReadDir succeeds (reads all the way
// to the end of the directory), it returns the slice and a nil error.
// If it encounters an error before the end of the directory,
// ReadDir returns the DirEntry list read until that point and a non-nil error.
func (n ipfsNode) ReadDir(max int) (entries []fs.DirEntry, err error) {
root, ok := n.Node.(files.Directory)
if !ok {
return nil, errors.New("not a directory")
}

iter := root.Entries()
for iter.Next() {
name := iter.Name()
node := iter.Node()

// Callers will typically discard entries if they get a non-nill
// error, so we make sure nodes are eventually closed.
runtime.SetFinalizer(node, func(c io.Closer) {
if err := c.Close(); err != nil {
slog.Warn("unable to close node",
"name", name,
"reason", err)
}
})

var subpath path.Path
if subpath, err = path.Join(n.Path, name); err != nil {
return
}

entries = append(entries, &ipfsNode{
Path: subpath,
Node: node})

// got max items?
if max--; max == 0 {
return
}
}

// If we get here, it's because the iterator stopped. It either
// failed or is exhausted. Any other error has already caused us
// to return.
if iter.Err() != nil {
err = iter.Err() // failed
} else if max >= 0 {
err = io.EOF // exhausted
}

return
}

// Info returns the FileInfo for the file or subdirectory described by the entry.
// The returned FileInfo may be from the time of the original directory read
// or from the time of the call to Info. If the file has been removed or renamed
// since the directory read, Info may return an error satisfying errors.Is(err, ErrNotExist).
// If the entry denotes a symbolic link, Info reports the information about the link itself,
// not the link's target.
func (n *ipfsNode) Info() (fs.FileInfo, error) {
return n, nil
}

// Type returns the type bits for the entry.
// The type bits are a subset of the usual FileMode bits, those returned by the FileMode.Type method.
func (n ipfsNode) Type() fs.FileMode {
if n.Mode().IsDir() {
return fs.ModeDir
}

return 0
}

func (n ipfsNode) Write(b []byte) (int, error) {
dst, ok := n.Node.(io.Writer)
if ok {
return dst.Write(b)
}

return 0, errors.New("not writeable")
}
Loading
Loading