Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Go

on:
push:
branches:
- main
pull_request:

jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.24'
- name: Ensure go.mod and go.sum are tidy
run: |
go mod tidy
git diff --exit-code go.mod
git diff --exit-code go.sum
- name: Run Go Vet
run: go vet ./...
- name: Run Staticcheck
# https://github.com/dominikh/go-tools/releases/tag/2025.1.1
run: go run honnef.co/go/tools/cmd/staticcheck@b8ec13ce4d00445d75da053c47498e6f9ec5d7d6 ./...
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.24'
- name: Run tests
run: go test ./...
172 changes: 172 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package relayx

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"

"github.com/ipni/go-indexer-core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
)

var _ indexer.Interface = (*Client)(nil)

type Client struct {
*clientOptions
}

func NewClient() (*Client, error) {
opts, err := newClientOptions()
if err != nil {
return nil, err
}
return &Client{clientOptions: opts}, nil
}

func (c *Client) Get(multihash multihash.Multihash) ([]indexer.Value, bool, error) {
endpoint, err := url.JoinPath(c.serverAddr, "find", multihash.B58String())
if err != nil {
return nil, false, err
}
resp, err := c.client.Get(endpoint)
if err != nil {
return nil, false, err
}
defer func() { _ = resp.Body.Close() }()
switch resp.StatusCode {
case http.StatusOK:
var findResp FindGetResponse
if err := json.NewDecoder(resp.Body).Decode(&findResp); err != nil {
return nil, false, err
}
if len(findResp.Providers) == 0 {
return nil, false, nil
}
return findResp.Providers, true, nil
case http.StatusNotFound:
return nil, false, nil
default:
var errResp ErrorResponse
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return nil, false, err
}
return nil, false, fmt.Errorf("unsuccessful response %d: %s", resp.StatusCode, errResp.Error)
}
}

func (c *Client) Put(iv indexer.Value, entries ...multihash.Multihash) error {
endpoint, err := url.JoinPath(c.serverAddr, "ingest", iv.ProviderID.String(), base64.StdEncoding.EncodeToString(iv.ContextID))
if err != nil {
return err
}
body, err := json.Marshal(IngestPutRequest{
Entries: entries,
Metadata: iv.MetadataBytes,
})
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
switch resp.StatusCode {
case http.StatusAccepted:
return nil
default:
var errResp ErrorResponse
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return err
}
return fmt.Errorf("unsuccessful response %d: %s", resp.StatusCode, errResp.Error)
}
}

func (c *Client) Remove(indexer.Value, ...multihash.Multihash) error {
return errors.New("not supported")
}

func (c *Client) RemoveProvider(ctx context.Context, id peer.ID) error {
endpoint, err := url.JoinPath(c.serverAddr, "ingest", id.String())
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, endpoint, nil)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
switch resp.StatusCode {
case http.StatusAccepted:
return nil
default:
var errResp ErrorResponse
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return err
}
return fmt.Errorf("unsuccessful response %d: %s", resp.StatusCode, errResp.Error)
}
}

func (c *Client) RemoveProviderContext(providerID peer.ID, contextID []byte) error {
endpoint, err := url.JoinPath(c.serverAddr, "ingest", providerID.String(), base64.StdEncoding.EncodeToString(contextID))
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodDelete, endpoint, nil)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
switch resp.StatusCode {
case http.StatusAccepted:
return nil
default:
var errResp ErrorResponse
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return err
}
return fmt.Errorf("unsuccessful response %d: %s", resp.StatusCode, errResp.Error)
}
}

func (c *Client) Size() (int64, error) {
return 0, nil
}

func (c *Client) Flush() error {
return nil
}

func (c *Client) Close() error {
c.client.CloseIdleConnections()
return nil
}

func (c *Client) Iter() (indexer.Iterator, error) {
return nil, nil
}

func (c *Client) Stats() (*indexer.Stats, error) {
return nil, nil
}
47 changes: 47 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package relayx

import (
"errors"
"net/http"
)

type (
ClientOption func(*clientOptions) error
clientOptions struct {
serverAddr string
client *http.Client
}
)

func newClientOptions(opts ...ClientOption) (*clientOptions, error) {
o := &clientOptions{
serverAddr: "http://localhost:8080",
client: http.DefaultClient,
}
for _, apply := range opts {
if err := apply(o); err != nil {
return nil, err
}
}
return o, nil
}

func WithServerAddr(addr string) ClientOption {
return func(o *clientOptions) error {
if addr == "" {
return errors.New("server address cannot be empty")
}
o.serverAddr = addr
return nil
}
}

func WithHttpClient(client *http.Client) ClientOption {
return func(o *clientOptions) error {
if client == nil {
return errors.New("http client cannot be nil")
}
o.client = client
return nil
}
}
105 changes: 105 additions & 0 deletions cmd/relayx/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"fmt"
"os"

ppebble "github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/ipfs/go-log/v2"
"github.com/ipni/go-indexer-core"
"github.com/ipni/go-indexer-core/store/pebble"
"github.com/ipni/relayx"
"github.com/urfave/cli/v2"
)

var logger = log.Logger("relayx/cmd")

func main() {
app := cli.App{
Name: "relayx",
Commands: []*cli.Command{
{
Name: "serve",
Usage: "Start the relayx server",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "Address to listen on (default: :8080)",
Value: "0.0.0.0:8080",
},
&cli.StringFlag{
Name: "delegate",
Usage: "The underlying indexer implementation to which to delegate requests. Supported values: pebble",
Required: true,
},
&cli.PathFlag{
Name: "pebblePath",
Usage: "Data path of pebble database. Has no effect if --delegate is not pebble.",
Value: ".",
DefaultText: "Current working directory",
},
&cli.PathFlag{
Name: "pebbleOptions",
Usage: "Path to the pebble options file. Has no effect if --delegate is not pebble.",
DefaultText: "Default pebble options",
},
},
Action: func(cctx *cli.Context) error {
var delegate indexer.Interface
switch d := cctx.String("delegate"); d {
case "pebble":
opts := (&ppebble.Options{}).EnsureDefaults()
if cctx.IsSet("pebbleOptions") {
popts, err := os.ReadFile(cctx.Path("pebbleOptions"))
if err != nil {
return fmt.Errorf("failed to read pebble options file: %w", err)
}
if err := opts.Parse(string(popts), &ppebble.ParseHooks{
NewCache: func(size int64) *ppebble.Cache {
return ppebble.NewCache(size)
},
NewFilterPolicy: func(name string) (ppebble.FilterPolicy, error) {
switch name {
case "none":
return nil, nil
case "rocksdb.BuiltinBloomFilter":
return bloom.FilterPolicy(10), nil
default:
return nil, fmt.Errorf("unknown filter policy: %s", name)
}
},
}); err != nil {
return fmt.Errorf("failed to parse pebble options: %w", err)
}
}
var err error
delegate, err = pebble.New(cctx.String("pebblePath"), opts)
if err != nil {
return fmt.Errorf("failed to create pebble indexer: %w", err)
}
default:
return fmt.Errorf("unknown delegate: %s", d)
}
server, err := relayx.NewServer(
relayx.WithListenAddr(cctx.String("listen")),
relayx.WithDelegateIndexer(delegate))
if err != nil {
return err
}
if err := server.Start(); err != nil {
return err
}
logger.Infow("Relayx server started", "address", cctx.String("listen"))
<-cctx.Context.Done()
logger.Info("Stopping relayx server")
return server.Stop()
},
},
},
}
if err := app.Run(os.Args); err != nil {
logger.Error("Error running app", "error", err)
os.Exit(1)
}
}
Loading