Skip to content

Commit

Permalink
Execution requested from multiple nodes (#81)
Browse files Browse the repository at this point in the history
* Move status codes to a separate package

* Change WaitMap implementation - use Context for waiting on value

* Basic implementation of executing on multiple nodes

* Move API models back to `api` package

* Update API models

* Quorum is configurable

* Add CLI option for specifying quorum

* Update and fix tests

* Determine code when working with multiple execution results

* Add more debug logs

* Fix timeout for execution

* Remove CLI option for setting quorum

* Quorum is determined by the request - not by node settings

* Move roll call filtering to a more appropriate function

* Add reason for failure to the API output

* Fix handling of roll call - checking connections is not correct in tests

* Fix logging of send errors on roll call
  • Loading branch information
Maelkum authored Apr 24, 2023
1 parent e49295e commit df751a3
Show file tree
Hide file tree
Showing 42 changed files with 637 additions and 323 deletions.
72 changes: 57 additions & 15 deletions api/execute.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
package api

import (
"errors"
"fmt"
"net/http"

"github.com/labstack/echo/v4"

"github.com/blocklessnetworking/b7s/models/api/request"
"github.com/blocklessnetworking/b7s/models/api/response"
"github.com/blocklessnetworking/b7s/models/blockless"
"github.com/blocklessnetworking/b7s/models/codes"
"github.com/blocklessnetworking/b7s/models/execute"
)

// ExecuteRequest describes the payload for the REST API request for function execution.
type ExecuteRequest execute.Request

// ExecuteResponse describes the REST API response for function execution.
type ExecuteResponse struct {
Code codes.Code `json:"code,omitempty"`
RequestID string `json:"request_id,omitempty"`
Message string `json:"message,omitempty"`
Results map[string]ExecuteResult `json:"results,omitempty"`
}

// ExecuteResult represents the API representation of a single execution response.
// It is similar to the model in `execute.Result`, except it omits the usage information for now.
type ExecuteResult struct {
Code codes.Code `json:"code,omitempty"`
Result execute.RuntimeOutput `json:"result,omitempty"`
RequestID string `json:"request_id,omitempty"`
}

// Execute implements the REST API endpoint for function execution.
func (a *API) Execute(ctx echo.Context) error {

// Unpack the API request.
var req request.Execute
var req ExecuteRequest
err := ctx.Bind(&req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
Expand All @@ -24,22 +44,44 @@ func (a *API) Execute(ctx echo.Context) error {
// TODO: Check - We perhaps want to return the request ID and not wait for the execution, right?
// It's probable that it will time out anyway, right?

// Get the execution result.
result, err := a.node.ExecuteFunction(ctx.Request().Context(), execute.Request(req))
// Determine status code.
code := http.StatusOK
// Get the execution results.
code, results, err := a.node.ExecuteFunction(ctx.Request().Context(), execute.Request(req))
if err != nil {
code = http.StatusInternalServerError
a.log.Warn().
Str("function_id", req.FunctionID).
Err(err).
Msg("node failed to execute function")
}

requestID := ""
exResults := make(map[string]ExecuteResult)

for id, er := range results {

// Get the requestID from any of the individual results.
if requestID == "" {
requestID = er.RequestID
}

exResults[id] = ExecuteResult{
Code: er.Code,
Result: er.Result,
RequestID: er.RequestID,
}
}

// Transform the node response format to the one returned by the API.
res := ExecuteResponse{
Code: code,
RequestID: requestID,
Results: exResults,
}

// Create the API response.
res := response.Execute{
Code: result.Code,
RequestID: result.RequestID,
Result: result.Result.Stdout,
ResultEx: result.Result,
// Communicate the reason for failure in these cases.
if errors.Is(err, blockless.ErrRollCallTimeout) || errors.Is(err, blockless.ErrExecutionNotEnoughNodes) {
res.Message = err.Error()
}

// Send the response.
return ctx.JSON(code, res)
return ctx.JSON(http.StatusOK, res)
}
44 changes: 27 additions & 17 deletions api/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,34 @@ import (
"github.com/stretchr/testify/require"

"github.com/blocklessnetworking/b7s/api"
"github.com/blocklessnetworking/b7s/models/api/response"
"github.com/blocklessnetworking/b7s/models/codes"
"github.com/blocklessnetworking/b7s/models/execute"
"github.com/blocklessnetworking/b7s/testing/mocks"
)

func TestAPI_Execute(t *testing.T) {

api := setupAPI(t)
srv := setupAPI(t)

req := mocks.GenericExecutionRequest

rec, ctx, err := setupRecorder(executeEndpoint, req)
require.NoError(t, err)

err = api.Execute(ctx)
err = srv.Execute(ctx)
require.NoError(t, err)

var res response.Execute
var res api.ExecuteResponse
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &res))

require.Equal(t, http.StatusOK, rec.Result().StatusCode)

require.Equal(t, mocks.GenericExecutionResult.Code, res.Code)
require.Equal(t, mocks.GenericExecutionResult.RequestID, res.RequestID)
require.Equal(t, mocks.GenericExecutionResult.Result.Stdout, res.Result)
require.Equal(t, mocks.GenericExecutionResult.Result, res.ResultEx)
require.Len(t, res.Results, 1)

peerID := mocks.GenericPeerID.String()
require.Equal(t, mocks.GenericExecutionResult.RequestID, res.Results[peerID].RequestID)
require.Equal(t, mocks.GenericExecutionResult.Result, res.Results[peerID].Result)
}

func TestAPI_Execute_HandlesErrors(t *testing.T) {
Expand All @@ -47,29 +50,37 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) {
},
}

peerID := mocks.GenericPeerID.String()

expectedCode := codes.Error

results := map[string]execute.Result{
peerID: executionResult,
}

node := mocks.BaselineNode(t)
node.ExecuteFunctionFunc = func(context.Context, execute.Request) (execute.Result, error) {
return executionResult, mocks.GenericError
node.ExecuteFunctionFunc = func(context.Context, execute.Request) (codes.Code, map[string]execute.Result, error) {
return expectedCode, results, mocks.GenericError
}

api := api.New(mocks.NoopLogger, node)
srv := api.New(mocks.NoopLogger, node)

req := mocks.GenericExecutionRequest

rec, ctx, err := setupRecorder(executeEndpoint, req)
require.NoError(t, err)

err = api.Execute(ctx)
err = srv.Execute(ctx)
require.NoError(t, err)

var res response.Execute
var res api.ExecuteResponse
err = json.Unmarshal(rec.Body.Bytes(), &res)
require.NoError(t, err)

require.Equal(t, http.StatusInternalServerError, rec.Result().StatusCode)
require.Equal(t, executionResult.Code, res.Code)
require.Equal(t, executionResult.Result.Stdout, res.Result)
require.Equal(t, executionResult.Result, res.ResultEx)
require.Equal(t, http.StatusOK, rec.Result().StatusCode)
require.Equal(t, expectedCode, res.Code)

require.Equal(t, results[peerID].Result, res.Results[peerID].Result)
}

func TestAPI_Execute_HandlesMalformedRequests(t *testing.T) {
Expand Down Expand Up @@ -143,5 +154,4 @@ func TestAPI_Execute_HandlesMalformedRequests(t *testing.T) {
require.Equal(t, http.StatusBadRequest, echoErr.Code)
})
}

}
15 changes: 12 additions & 3 deletions api/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@ import (
"time"

"github.com/labstack/echo/v4"

"github.com/blocklessnetworking/b7s/models/api/request"
)

const (
functionInstallTimeout = 10 * time.Second
)

// InstallFunctionRequest describes the payload for the REST API request for function install.
type InstallFunctionRequest struct {
CID string `json:"cid"`
URI string `json:"uri"`
}

// InstallFunctionResponse describes the REST API response for the function install.
type InstallFunctionResponse struct {
Code string `json:"code"`
}

func (a *API) Install(ctx echo.Context) error {

// Unpack the API request.
var req request.InstallFunction
var req InstallFunctionRequest
err := ctx.Bind(&req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
Expand Down
38 changes: 18 additions & 20 deletions api/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,24 @@ import (
"github.com/stretchr/testify/require"

"github.com/blocklessnetworking/b7s/api"
"github.com/blocklessnetworking/b7s/models/api/request"
"github.com/blocklessnetworking/b7s/models/response"
"github.com/blocklessnetworking/b7s/testing/mocks"
)

func TestAPI_FunctionInstall(t *testing.T) {
t.Run("nominal case", func(t *testing.T) {
t.Parallel()

api := setupAPI(t)

req := request.InstallFunction{
req := api.InstallFunctionRequest{
URI: "dummy-function-id",
CID: "dummy-cid",
}

srv := setupAPI(t)

rec, ctx, err := setupRecorder(installEndpoint, req)
require.NoError(t, err)

err = api.Install(ctx)
err = srv.Install(ctx)
require.NoError(t, err)

require.Equal(t, http.StatusOK, rec.Result().StatusCode)
Expand All @@ -42,17 +40,17 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {
t.Run("missing URI and CID", func(t *testing.T) {
t.Parallel()

api := setupAPI(t)

req := request.InstallFunction{
req := api.InstallFunctionRequest{
URI: "",
CID: "",
}

srv := setupAPI(t)

_, ctx, err := setupRecorder(installEndpoint, req)
require.NoError(t, err)

err = api.Install(ctx)
err = srv.Install(ctx)
require.Error(t, err)

echoErr, ok := err.(*echo.HTTPError)
Expand All @@ -74,22 +72,22 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {
return nil
}

api := api.New(mocks.NoopLogger, node)

req := request.InstallFunction{
req := api.InstallFunctionRequest{
URI: "dummy-uri",
CID: "dummy-cid",
}

srv := api.New(mocks.NoopLogger, node)

rec, ctx, err := setupRecorder(installEndpoint, req)
require.NoError(t, err)

err = api.Install(ctx)
err = srv.Install(ctx)
require.NoError(t, err)

require.Equal(t, http.StatusOK, rec.Result().StatusCode)

var res = response.InstallFunction{}
var res api.InstallFunctionResponse
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &res))

num, err := strconv.Atoi(res.Code)
Expand All @@ -105,17 +103,17 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {
return mocks.GenericError
}

api := api.New(mocks.NoopLogger, node)
srv := api.New(mocks.NoopLogger, node)

req := request.InstallFunction{
req := api.InstallFunctionRequest{
URI: "dummy-uri",
CID: "dummy-cid",
}

_, ctx, err := setupRecorder(installEndpoint, req)
require.NoError(t, err)

err = api.Install(ctx)
err = srv.Install(ctx)
require.Error(t, err)

echoErr, ok := err.(*echo.HTTPError)
Expand All @@ -127,7 +125,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {

func TestAPI_InstallFunction_HandlesMalformedRequests(t *testing.T) {

api := setupAPI(t)
srv := setupAPI(t)

const (
wrongFieldType = `
Expand Down Expand Up @@ -183,7 +181,7 @@ func TestAPI_InstallFunction_HandlesMalformedRequests(t *testing.T) {
_, ctx, err := setupRecorder(installEndpoint, test.payload, prepare)
require.NoError(t, err)

err = api.Install(ctx)
err = srv.Install(ctx)
require.Error(t, err)

echoErr, ok := err.(*echo.HTTPError)
Expand Down
3 changes: 2 additions & 1 deletion api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package api
import (
"context"

"github.com/blocklessnetworking/b7s/models/codes"
"github.com/blocklessnetworking/b7s/models/execute"
)

type Node interface {
ExecuteFunction(context.Context, execute.Request) (execute.Result, error)
ExecuteFunction(context.Context, execute.Request) (codes.Code, map[string]execute.Result, error)
ExecutionResult(id string) (execute.Result, bool)
PublishFunctionInstall(ctx context.Context, uri string, cid string) error
}
9 changes: 6 additions & 3 deletions api/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"net/http"

"github.com/labstack/echo/v4"

"github.com/blocklessnetworking/b7s/models/api/request"
)

// ExecutionResultRequest describes the payload for the REST API request for execution result.
type ExecutionResultRequest struct {
ID string `json:"id"`
}

// ExecutionResult implements the REST API endpoint for retrieving the result of a function execution.
func (a *API) ExecutionResult(ctx echo.Context) error {

// Get the request ID.
var request request.ExecutionResult
var request ExecutionResultRequest
err := ctx.Bind(&request)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
Expand Down
Loading

0 comments on commit df751a3

Please sign in to comment.