Skip to content

Commit

Permalink
Update execute to pass bytes instead of parsed enity
Browse files Browse the repository at this point in the history
  • Loading branch information
fr0stylo committed Nov 6, 2023
1 parent 6e0cb0a commit b4cf1a1
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 36 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ all:

build: fs all unet

fs:
fs:
sudo rm -rf ./fs
cp -r ${shell docker image inspect alpine:latest -f \ {{.GraphDriver.Data.UpperDir}}} ./fs
echo "168.0.0.1 host.funcgo.internal" >> ./fs/etc/resolv.conf
28 changes: 11 additions & 17 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,23 @@ func main() {
case "container":
containerInit()
default:
mngr := runtime.NewFunction(&runtime.FunctionOpts{
mux := mux.NewRouter()
mux.Handle("/{id}", &apigw.Handler{Runner: runtime.NewFunction(&runtime.FunctionOpts{
MaxConcurrency: 10,
MainExec: "/etc/function",
RootFS: "./fs",
Files: runtime.FileList(
runtime.Files{From: "./bin/function", To: "/etc/function"},
),
})

mux := mux.NewRouter()
mux.Handle("/{id}", &apigw.Handler{Runner: mngr})
// for {
// reader := bufio.NewReader(os.Stdin)
// fmt.Print("Hit me\n")
// r, _ := reader.ReadString('\n')

// fmt.Printf("%s\n", r)
// if r == "exit" {
// break
// }

// go mngr.Execute(map[string]string{"Url": "/asdasda", "Body": "ok"})
// }
})}).Methods(http.MethodPost)
mux.Handle("/{id}", &apigw.Handler{Runner: runtime.NewFunction(&runtime.FunctionOpts{
MaxConcurrency: 10,
MainExec: "/etc/function",
RootFS: "./fs",
Files: runtime.FileList(
runtime.Files{From: "./bin/function2", To: "/etc/function"},
),
})}).Methods(http.MethodGet)

http.ListenAndServe("0.0.0.0:8000", mux)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func main() {
log.Info(t.Body)
b, _ := json.Marshal(t)
return &funcgo.Response{
StatusCode: 201,
StatusCode: 206,
Body: string(b),
}, nil
})
Expand Down
23 changes: 23 additions & 0 deletions cmd/function2/function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package main

import (
"context"
"encoding/json"

"go.uber.org/zap"

"github.com/fr0stylo/funcgo/pkg/funcgo"
)

func main() {
funcgo.Handler(func(ctx context.Context, log *zap.SugaredLogger, t *funcgo.Request) (*funcgo.Response, error) {
log.Info(t.Body)
b, _ := json.Marshal(t)
return &funcgo.Response{
StatusCode: 201,
Body: string(b),
}, nil
})
}

type Typed interface{}
8 changes: 1 addition & 7 deletions pkg/apigw/gw.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

qparams := make(map[string][]string)
for n, v := range r.URL.Query() {
qparams[n] = v
}

req := funcgo.Request{
Params: mux.Vars(r),
QueryParams: r.URL.Query(),
Expand All @@ -50,8 +45,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

var fres funcgo.Response
j, _ := json.Marshal(res)
json.Unmarshal(j, &fres)
json.Unmarshal(res, &fres)

for n, h := range fres.Headers {
w.Header().Add(n, h)
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewFunction(opts *FunctionOpts) *Function {
}
}

func (r *Function) Execute(obj any) (any, error) {
func (r *Function) Execute(obj any) ([]byte, error) {
var w Runnable
for w = r.pool.GetAvailable(); w == nil; w = r.pool.GetAvailable() {
if r.maxConcurrency > r.pool.Size() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (r *Worker) setBusy() {
r.m.Lock()
}

func (r *Worker) Execute(obj any) (any, error) {
func (r *Worker) Execute(obj any) ([]byte, error) {
r.setBusy()
defer r.setNotBusy()
log.Printf("[%s] exec: started", r.name)
Expand Down
10 changes: 3 additions & 7 deletions pkg/runtime/worker_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"

Expand All @@ -32,7 +33,7 @@ func NewWorkerApi(ip string) *WorkerApi {
}
}

func (r *WorkerApi) Execute(o any) (any, error) {
func (r *WorkerApi) Execute(o any) ([]byte, error) {
b, _ := json.Marshal(o)

res, err := r.client.Post(fmt.Sprintf("http://%s:9999", r.ip), "application/json", bytes.NewBuffer(b))
Expand All @@ -42,10 +43,5 @@ func (r *WorkerApi) Execute(o any) (any, error) {

defer res.Body.Close()
// Kills other types as it becomes map[string]interface
var body any
if err := json.NewDecoder(res.Body).Decode(&body); err != nil {
return nil, err
}

return body, nil
return io.ReadAll(res.Body)
}
2 changes: 1 addition & 1 deletion pkg/runtime/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type WorkerPoolOpts struct {
}

type Runnable interface {
Execute(any) (any, error)
Execute(any) ([]byte, error)
}

func NewWorkerPool(opts *WorkerPoolOpts) *WorkerPool {
Expand Down

0 comments on commit b4cf1a1

Please sign in to comment.