Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Commit

Permalink
feat: pubsub with multibase
Browse files Browse the repository at this point in the history
This updates HTTP RPC wire format to one from
ipfs/kubo#8183
  • Loading branch information
lidel committed Oct 15, 2021
1 parent 39cfea2 commit d69d417
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ module github.com/ipfs/go-ipfs-api

require (
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipfs-util v0.0.2
github.com/libp2p/go-libp2p-core v0.6.1
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.3.0
github.com/multiformats/go-multibase v0.0.3
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-ipfs-files v0.0.8 h1:8o0oFJkJ8UkO/ABl8T6ac6tKF3+NIpj67aAB6ZpusRg=
github.com/ipfs/go-ipfs-files v0.0.8/go.mod h1:wiN/jSG8FKyk7N0WyctKSvq3ljIa2NNTiZB55kpTdOs=
github.com/ipfs/go-ipfs-files v0.0.9 h1:OFyOfmuVDu9c5YtjSDORmwXzE6fmZikzZpzsnNkgFEg=
github.com/ipfs/go-ipfs-files v0.0.9/go.mod h1:aFv2uQ/qxWpL/6lidWvnSQmaVqCrf0TBGoUr+C1Fo84=
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
Expand Down Expand Up @@ -89,8 +89,9 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c h1:GGsyl0dZ2jJgVT+VvWBf/cNijrHRhkrTjkmp5wg7li0=
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c/go.mod h1:xxcJeBb7SIUl/Wzkz1eVKJE/CB34YNrqX2TQI6jY9zs=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
Expand Down Expand Up @@ -139,6 +140,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
35 changes: 28 additions & 7 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package shell

import (
"encoding/json"
"fmt"
"io"

"github.com/libp2p/go-libp2p-core/peer"
mbase "github.com/multiformats/go-multibase"
)

// Message is a pubsub message.
Expand All @@ -31,26 +33,45 @@ func newPubSubSubscription(resp io.ReadCloser) *PubSubSubscription {
// Next waits for the next record and returns that.
func (s *PubSubSubscription) Next() (*Message, error) {
var r struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
From string `json:"from,omitempty"`
Data string `json:"data,omitempty"`
Seqno string `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

err := s.dec.Decode(&r)
if err != nil {
return nil, err
}
fmt.Printf("%+v \n", r)

from, err := peer.IDFromBytes(r.From)
// fields are wrapped in multibase when sent over HTTP RPC
// and need to be decoded (https://github.com/ipfs/go-ipfs/pull/8183)
from, err := peer.Decode(r.From)
if err != nil {
return nil, err
}
_, data, err := mbase.Decode(r.Data)
if err != nil {
return nil, err
}
_, seqno, err := mbase.Decode(r.Seqno)
if err != nil {
return nil, err
}
topics := make([]string, len(r.TopicIDs))
for _, mbtopic := range r.TopicIDs {
_, topic, err := mbase.Decode(mbtopic)
if err != nil {
return nil, err
}
topics = append(topics, string(topic))
}
return &Message{
From: from,
Data: r.Data,
Seqno: r.Seqno,
TopicIDs: r.TopicIDs,
Data: data,
Seqno: seqno,
TopicIDs: topics,
}, nil
}

Expand Down
13 changes: 11 additions & 2 deletions shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
homedir "github.com/mitchellh/go-homedir"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
mbase "github.com/multiformats/go-multibase"
tar "github.com/whyrusleeping/tar-utils"

p2pmetrics "github.com/libp2p/go-libp2p-core/metrics"
Expand Down Expand Up @@ -516,7 +517,8 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) {

func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
// connect
resp, err := s.Request("pubsub/sub", topic).Send(context.Background())
encoder, _ := mbase.EncoderByName("base64url")
resp, err := s.Request("pubsub/sub", encoder.Encode([]byte(topic))).Send(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -528,7 +530,14 @@ func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
}

func (s *Shell) PubSubPublish(topic, data string) (err error) {
resp, err := s.Request("pubsub/pub", topic, data).Send(context.Background())

fr := files.NewReaderFile(bytes.NewReader([]byte(data)))
slf := files.NewSliceDirectory([]files.DirEntry{files.FileEntry("", fr)})
fileReader := files.NewMultiFileReader(slf, true)

encoder, _ := mbase.EncoderByName("base64url")
resp, err := s.Request("pubsub/pub", encoder.Encode([]byte(topic))).
Body(fileReader).Send(context.Background())
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ func TestPubSub(t *testing.T) {
s := NewShell(shellUrl)

var (
topic = "test"
topic = fmt.Sprintf("test\n topic\r\t with unsafe bytes")
payload1 = fmt.Sprintf("Hello\r\nWorld\t!")
payload2 = fmt.Sprintf("Hallo\r\nWelt\t!!11oneonę")

sub *PubSubSubscription
err error
Expand All @@ -376,7 +378,7 @@ func TestPubSub(t *testing.T) {
time.Sleep(10 * time.Millisecond)

t.Log("publishing...")
is.Nil(s.PubSubPublish(topic, "Hello World!"))
is.Nil(s.PubSubPublish(topic, payload1))
t.Log("pub: done")

t.Log("next()...")
Expand All @@ -385,23 +387,23 @@ func TestPubSub(t *testing.T) {

is.Nil(err)
is.NotNil(r)
is.Equal(r.Data, "Hello World!")
is.Equal(r.Data, payload1)

sub2, err := s.PubSubSubscribe(topic)
is.Nil(err)
is.NotNil(sub2)

is.Nil(s.PubSubPublish(topic, "Hallo Welt!"))
is.Nil(s.PubSubPublish(topic, payload2))

r, err = sub2.Next()
is.Nil(err)
is.NotNil(r)
is.Equal(r.Data, "Hallo Welt!")
is.Equal(r.Data, payload2)

r, err = sub.Next()
is.NotNil(r)
is.Nil(err)
is.Equal(r.Data, "Hallo Welt!")
is.Equal(r.Data, payload2)

is.Nil(sub.Cancel())
}
Expand Down

0 comments on commit d69d417

Please sign in to comment.