Skip to content

Commit

Permalink
Cleanup message handling code (#136)
Browse files Browse the repository at this point in the history
* Remove the "Type" field from Message models

* Message handlers accept concrete types

* Fix flag handling + update integration tests

* Filter messages depending on where they come from (publish vs direct)
  • Loading branch information
Maelkum authored Apr 5, 2024
1 parent d28be96 commit 60a343d
Show file tree
Hide file tree
Showing 47 changed files with 444 additions and 410 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cmd/node/node
cmd/node/.b7s_*
cmd/node/*.yaml

cmd/keygen/keygen
cmd/keyforge/keyforge
Expand Down
12 changes: 4 additions & 8 deletions api/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ import (
// Execute implements the REST API endpoint for function execution.
func (a *API) Health(ctx echo.Context) error {

// respond with health check
resp := response.Health{
Type: "health",
Code: http.StatusOK,
}

// Send the response.
return ctx.JSON(http.StatusOK, resp)
return ctx.JSON(
http.StatusOK,
response.Health{Code: http.StatusOK},
)
}
6 changes: 3 additions & 3 deletions cmd/node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ log:
connectivity:
address: 127.0.0.1
port: 9000
private-key: ~/.b7s/path/to/priv/key.bin
private-key: /home/user/.b7s/path/to/priv/key.bin
websocket: true


worker:
runtime-path: ~/.local/blockless-runtime/bin
runtime-path: /home/user/.local/blockless-runtime/bin
cpu-percentage-limit: 0.8

```
Expand All @@ -101,7 +101,7 @@ The created `node` will listen on all addresses on TCP port 9000.
Database used to persist Node data between runs will be created in the `peer-database` subdirectory.
On the other hand, Node will persist function data in the default database, in the `function-db` subdirectory.

Blockless Runtime path is given as `~/.local/bin`.
Blockless Runtime path is given as `/home/user/.local/bin`.
At startup, node will check if the Blockless Runtime is actually found there, namely the [bls-runtime](https://blockless.network/docs/protocol/runtime).

Node Identity will be determined by the private key found in `priv.bin` file in the `keys` subdirectory.
Expand Down
2 changes: 1 addition & 1 deletion cmd/node/internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
DefaultFunctionDB = "function-db"
DefaultConcurrency = uint(node.DefaultConcurrency)
DefaultUseWebsocket = false
DefaultWorkspace = ""
DefaultWorkspace = "workspace"
)

type configOption struct {
Expand Down
6 changes: 3 additions & 3 deletions cmd/node/internal/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func load(args []string) (*Config, error) {
flags.stringFlag(roleCfg, DefaultRole)
flags.uintFlag(concurrencyCfg, DefaultConcurrency)
flags.stringSliceFlag(bootNodesCfg, nil)
flags.stringFlag(workspaceCfg, DefaultWorkspace)
flags.stringFlag(workspaceCfg, "")
flags.boolFlag(attributesCfg, false)
flags.stringFlag(peerDBCfg, DefaultPeerDB)
flags.stringFlag(functionDBCfg, DefaultFunctionDB)
flags.stringFlag(peerDBCfg, "")
flags.stringFlag(functionDBCfg, "")
flags.stringSliceFlag(topicsCfg, nil)

// Log.
Expand Down
1 change: 0 additions & 1 deletion consensus/pbft/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error {
r.lastExecuted = sequence

msg := response.Execute{
Type: blockless.MessageExecuteResponse,
Code: res.Code,
RequestID: request.ID,
Results: execute.ResultMap{
Expand Down
27 changes: 7 additions & 20 deletions models/blockless/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,17 @@ package blockless
// Message types in the Blockless protocol.
const (
MessageHealthCheck = "MsgHealthCheck"
MessageExecute = "MsgExecute"
MessageExecuteResult = "MsgExecuteResult"
MessageExecuteError = "MsgExecuteError"
MessageExecuteTimeout = "MsgExecuteTimeout"
MessageExecuteUnknown = "MsgExecuteUnknown"
MessageExecuteInvalid = "MsgExecuteInvalid"
MessageExecuteNotFound = "MsgExecuteNotFound"
MessageExecuteNotSupported = "MsgExecuteNotSupported"
MessageExecuteNotImplemented = "MsgExecuteNotImplemented"
MessageExecuteNotAuthorized = "MsgExecuteNotAuthorized"
MessageExecuteNotPermitted = "MsgExecuteNotPermitted"
MessageExecuteNotAvailable = "MsgExecuteNotAvailable"
MessageExecuteNotReady = "MsgExecuteNotReady"
MessageExecuteNotConnected = "MsgExecuteNotConnected"
MessageExecuteNotInitialized = "MsgExecuteNotInitialized"
MessageExecuteNotConfigured = "MsgExecuteNotConfigured"
MessageExecuteNotInstalled = "MsgExecuteNotInstalled"
MessageExecuteNotUpgraded = "MsgExecuteNotUpgraded"
MessageInstallFunction = "MsgInstallFunction"
MessageInstallFunctionResponse = "MsgInstallFunctionResponse"
MessageRollCall = "MsgRollCall"
MessageRollCallResponse = "MsgRollCallResponse"
MessageExecute = "MsgExecute"
MessageExecuteResponse = "MsgExecuteResponse"
MessageInstallFunction = "MsgInstallFunction"
MessageInstallFunctionResponse = "MsgInstallFunctionResponse"
MessageFormCluster = "MsgFormCluster"
MessageFormClusterResponse = "MsgFormClusterResponse"
MessageDisbandCluster = "MsgDisbandCluster"
)

type Message interface {
Type() string
}
2 changes: 1 addition & 1 deletion models/execute/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Parameter struct {

// Config represents the configurable options for an execution request.
type Config struct {
Runtime BLSRuntimeConfig `json:"runtime,omitempty"`
Runtime BLSRuntimeConfig `json:"runtime,omitempty"`
Environment []EnvVar `json:"env_vars,omitempty"`
Stdin *string `json:"stdin,omitempty"`
Permissions []string `json:"permissions,omitempty"`
Expand Down
24 changes: 20 additions & 4 deletions models/request/disband_cluster.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package request

import (
"github.com/libp2p/go-libp2p/core/peer"
"encoding/json"

"github.com/blocklessnetwork/b7s/models/blockless"
)

var _ (json.Marshaler) = (*DisbandCluster)(nil)

// DisbandCluster describes the `MessageDisbandCluster` request payload.
// It is sent after head node receives the leaders execution response.
type DisbandCluster struct {
Type string `json:"type,omitempty"`
From peer.ID `json:"from,omitempty"`
RequestID string `json:"request_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
}

func (DisbandCluster) Type() string { return blockless.MessageDisbandCluster }

func (d DisbandCluster) MarshalJSON() ([]byte, error) {
type Alias DisbandCluster
rec := struct {
Alias
Type string `json:"type"`
}{
Alias: Alias(d),
Type: d.Type(),
}
return json.Marshal(rec)
}
31 changes: 20 additions & 11 deletions models/request/execute.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
package request

import (
"encoding/json"
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/execute"
)

var _ (json.Marshaler) = (*Execute)(nil)

// Execute describes the `MessageExecute` request payload.
type Execute struct {
Type string `json:"type,omitempty"`
From peer.ID `json:"from,omitempty"`
Code string `json:"code,omitempty"`
Topic string `json:"topic,omitempty"`

execute.Request // execute request is embedded.

// RequestID may be set initially, if the execution request is relayed via roll-call.
RequestID string `json:"request_id,omitempty"`
Topic string `json:"topic,omitempty"`
RequestID string `json:"request_id,omitempty"` // RequestID may be set initially, if the execution request is relayed via roll-call.
Timestamp time.Time `json:"timestamp,omitempty"` // Execution request timestamp is a factor for PBFT.
}

func (Execute) Type() string { return blockless.MessageExecute }

// Execution request timestamp is a factor for PBFT.
Timestamp time.Time `json:"timestamp,omitempty"`
func (e Execute) MarshalJSON() ([]byte, error) {
type Alias Execute
rec := struct {
Alias
Type string `json:"type"`
}{
Alias: Alias(e),
Type: e.Type(),
}
return json.Marshal(rec)
}
21 changes: 19 additions & 2 deletions models/request/form_cluster.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
package request

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/models/blockless"
)

var _ (json.Marshaler) = (*FormCluster)(nil)

// FormCluster describes the `MessageFormCluster` request payload.
// It is sent on clustered execution of a request.
type FormCluster struct {
Type string `json:"type,omitempty"`
From peer.ID `json:"from,omitempty"`
RequestID string `json:"request_id,omitempty"`
Peers []peer.ID `json:"peers,omitempty"`
Consensus consensus.Type `json:"consensus,omitempty"`
}

func (FormCluster) Type() string { return blockless.MessageFormCluster }

func (f FormCluster) MarshalJSON() ([]byte, error) {
type Alias FormCluster
rec := struct {
Alias
Type string `json:"type"`
}{
Alias: Alias(f),
Type: f.Type(),
}
return json.Marshal(rec)
}
26 changes: 21 additions & 5 deletions models/request/install_function.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package request

import (
"github.com/libp2p/go-libp2p/core/peer"
"encoding/json"

"github.com/blocklessnetwork/b7s/models/blockless"
)

var _ (json.Marshaler) = (*InstallFunction)(nil)

// InstallFunction describes the `MessageInstallFunction` request payload.
type InstallFunction struct {
Type string `json:"type,omitempty"`
From peer.ID `json:"from,omitempty"`
ManifestURL string `json:"manifest_url,omitempty"`
CID string `json:"cid,omitempty"`
ManifestURL string `json:"manifest_url,omitempty"`
CID string `json:"cid,omitempty"`
}

func (InstallFunction) Type() string { return blockless.MessageInstallFunction }

func (f InstallFunction) MarshalJSON() ([]byte, error) {
type Alias InstallFunction
rec := struct {
Alias
Type string `json:"type"`
}{
Alias: Alias(f),
Type: f.Type(),
}
return json.Marshal(rec)
}
21 changes: 19 additions & 2 deletions models/request/roll_call.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
package request

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/execute"
)

var _ (json.Marshaler) = (*RollCall)(nil)

// RollCall describes the `MessageRollCall` message payload.
type RollCall struct {
From peer.ID `json:"from,omitempty"`
Type string `json:"type,omitempty"`
Origin peer.ID `json:"origin,omitempty"` // Origin is the peer that initiated the roll call.
FunctionID string `json:"function_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
Consensus consensus.Type `json:"consensus"`
Attributes *execute.Attributes `json:"attributes,omitempty"`
}

func (RollCall) Type() string { return blockless.MessageRollCall }

func (r RollCall) MarshalJSON() ([]byte, error) {
type Alias RollCall
rec := struct {
Alias
Type string `json:"type"`
}{
Alias: Alias(r),
Type: r.Type(),
}
return json.Marshal(rec)
}
21 changes: 17 additions & 4 deletions models/response/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
)

var _ (json.Marshaler) = (*Execute)(nil)

// Execute describes the response to the `MessageExecute` message.
type Execute struct {
Type string `json:"type,omitempty"`
RequestID string `json:"request_id,omitempty"`
From peer.ID `json:"from,omitempty"`
Code codes.Code `json:"code,omitempty"`
Results execute.ResultMap `json:"results,omitempty"`
Cluster execute.Cluster `json:"cluster,omitempty"`
Expand All @@ -31,6 +32,20 @@ type Execute struct {
Message string `json:"message,omitempty"`
}

func (Execute) Type() string { return blockless.MessageExecuteResponse }

func (e Execute) MarshalJSON() ([]byte, error) {
type Alias Execute
rec := struct {
Alias
Type string `json:"type"`
}{
Alias: Alias(e),
Type: e.Type(),
}
return json.Marshal(rec)
}

type PBFTResultInfo struct {
View uint `json:"view"`
RequestTimestamp time.Time `json:"request_timestamp,omitempty"`
Expand All @@ -42,7 +57,6 @@ func (e *Execute) Sign(key crypto.PrivKey) error {
// Exclude signature and the `from` field from the signature.
cp := *e
cp.Signature = ""
cp.From = ""

payload, err := json.Marshal(cp)
if err != nil {
Expand All @@ -63,7 +77,6 @@ func (e Execute) VerifySignature(key crypto.PubKey) error {
// Exclude signature and the `from` field from the signature.
cp := e
cp.Signature = ""
cp.From = ""

payload, err := json.Marshal(cp)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions models/response/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/stretchr/testify/require"

"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
"github.com/blocklessnetwork/b7s/testing/mocks"
Expand All @@ -15,9 +14,7 @@ import (
func TestExecute_Signing(t *testing.T) {

sampleRes := Execute{
Type: blockless.MessageExecuteResponse,
RequestID: mocks.GenericUUID.String(),
From: mocks.GenericPeerID,
Code: codes.OK,
Results: execute.ResultMap{
mocks.GenericPeerID: mocks.GenericExecutionResult,
Expand Down
Loading

0 comments on commit 60a343d

Please sign in to comment.