Skip to content

Commit eaf3fd3

Browse files
authored
add http/grpc header support for py/js functions (#40)
* add http/grpc header support for py/js functions * fix review comments * fix tests for raspberry | bypass special chars in request headers * the missing tests for the headers func + update sieve func
1 parent 441ce0f commit eaf3fd3

File tree

14 files changed

+206
-18
lines changed

14 files changed

+206
-18
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ Additionally, we provide scripts to read logs from your function and to wipe all
9090

9191
This tinyFaaS prototype only supports functions written for NodeJS 20, Python 3.9, and binary functions.
9292
A good place to get started with writing functions is the selection of test functions in [`./test/fns`](./test/fns/).
93+
HTTP headers and GRPC Metadata are accessible in NodeJS and Python functions as key values. Check the "show-headers" test functions for more information.
9394

9495
#### NodeJS 20
9596

pkg/coap/coap.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"github.com/pfandzelter/go-coap"
99
)
1010

11+
const async = false
12+
1113
func Start(r *rproxy.RProxy, listenAddr string) {
1214

1315
h := coap.FuncHandler(
@@ -17,8 +19,6 @@ func Start(r *rproxy.RProxy, listenAddr string) {
1719
log.Printf("is confirmable: %v", m.IsConfirmable())
1820
log.Printf("path: %s", m.PathString())
1921

20-
async := false
21-
2222
p := m.PathString()
2323

2424
for p != "" && p[0] == '/' {
@@ -27,7 +27,7 @@ func Start(r *rproxy.RProxy, listenAddr string) {
2727

2828
log.Printf("have request for path: %s (async: %v)", p, async)
2929

30-
s, res := r.Call(p, m.Payload, async)
30+
s, res := r.Call(p, m.Payload, async, nil)
3131

3232
mes := &coap.Message{
3333
Type: coap.Acknowledgement,

pkg/docker/runtimes/python3/functionhandler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@ def do_POST(self) -> None:
3232
if d == "":
3333
d = None
3434

35+
# Read headers into a dictionary
36+
headers: typing.Dict[str, str] = {k: v for k, v in self.headers.items()}
37+
3538
try:
36-
res = fn.fn(d)
39+
res = fn.fn(d, headers)
3740
self.send_response(200)
3841
self.end_headers()
3942
if res is not None:

pkg/grpc/grpc.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/OpenFogStack/tinyFaaS/pkg/grpc/tinyfaas"
1010
"github.com/OpenFogStack/tinyFaaS/pkg/rproxy"
1111
"google.golang.org/grpc"
12+
"google.golang.org/grpc/metadata"
1213
)
1314

1415
// GRPCServer is the grpc endpoint for this tinyFaaS instance.
@@ -21,7 +22,21 @@ func (gs *GRPCServer) Request(ctx context.Context, d *tinyfaas.Data) (*tinyfaas.
2122

2223
log.Printf("have request for path: %s (async: %v)", d.FunctionIdentifier, false)
2324

24-
s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false)
25+
// Extract metadata from the gRPC context
26+
md, ok := metadata.FromIncomingContext(ctx)
27+
headers := make(map[string]string)
28+
if ok {
29+
// Convert metadata to map[string]string
30+
for k, v := range md {
31+
if len(v) > 0 {
32+
headers[k] = v[0]
33+
}
34+
}
35+
} else {
36+
log.Print("failed to extract metadata from context, using empty headers GRPC request")
37+
}
38+
39+
s, res := gs.r.Call(d.FunctionIdentifier, []byte(d.Data), false, headers)
2540

2641
switch s {
2742
case rproxy.StatusOK:

pkg/http/http.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@ func Start(r *rproxy.RProxy, listenAddr string) {
3131
return
3232
}
3333

34-
s, res := r.Call(p, req_body, async)
34+
headers := make(map[string]string)
35+
for k, v := range req.Header {
36+
headers[k] = v[0]
37+
}
38+
39+
s, res := r.Call(p, req_body, async, headers)
3540

3641
switch s {
3742
case rproxy.StatusOK:

pkg/rproxy/rproxy.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log"
88
"math/rand"
99
"net/http"
10+
"regexp"
1011
"sync"
1112
)
1213

@@ -59,7 +60,7 @@ func (r *RProxy) Del(name string) error {
5960
return nil
6061
}
6162

62-
func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte) {
63+
func (r *RProxy) Call(name string, payload []byte, async bool, headers map[string]string) (Status, []byte) {
6364

6465
handler, ok := r.hosts[name]
6566

@@ -75,27 +76,33 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)
7576

7677
log.Printf("chosen handler: %s", h)
7778

78-
// call function
79+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s:8000/fn", h), bytes.NewBuffer(payload))
80+
if err != nil {
81+
log.Print(err)
82+
return StatusError, nil
83+
}
84+
for k, v := range headers {
85+
cleanedKey := cleanHeaderKey(k) // remove special chars from key
86+
req.Header.Set(cleanedKey, v)
87+
}
88+
89+
// call function asynchronously
7990
if async {
8091
log.Printf("async request accepted")
8192
go func() {
82-
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))
83-
84-
if err != nil {
93+
resp, err2 := http.DefaultClient.Do(req)
94+
if err2 != nil {
8595
return
8696
}
87-
8897
resp.Body.Close()
89-
9098
log.Printf("async request finished")
9199
}()
92100
return StatusAccepted, nil
93101
}
94102

95103
// call function and return results
96104
log.Printf("sync request starting")
97-
resp, err := http.Post(fmt.Sprintf("http://%s:8000/fn", h), "application/binary", bytes.NewBuffer(payload))
98-
105+
resp, err := http.DefaultClient.Do(req)
99106
if err != nil {
100107
log.Print(err)
101108
return StatusError, nil
@@ -115,3 +122,9 @@ func (r *RProxy) Call(name string, payload []byte, async bool) (Status, []byte)
115122

116123
return StatusOK, res_body
117124
}
125+
func cleanHeaderKey(key string) string {
126+
// a regex pattern to match special characters
127+
re := regexp.MustCompile(`[:()<>@,;:\"/[\]?={} \t]`)
128+
// Replace special characters with an empty string
129+
return re.ReplaceAllString(key, "")
130+
}

test/fns/echo-js/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
module.exports = (req, res) => {
33
const response = req.body;
4+
const headers = req.headers; // headers from the http request or GRPC metadata
45

56
console.log(response);
67

test/fns/echo/fn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
import typing
44

5-
def fn(input: typing.Optional[str]) -> typing.Optional[str]:
5+
def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
66
"""echo the input"""
77
return input

test/fns/show-headers-js/index.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
module.exports = (req, res) => {
3+
const body = req.body;
4+
const headers = req.headers; // headers from the http request or GRPC metadata
5+
6+
console.log("Headers:", headers);
7+
8+
res.send(headers);
9+
}

test/fns/show-headers-js/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"name": "fn", "version": "1.0.0", "main": "index.js"}

test/fns/show-headers/fn.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import json
2+
import typing
3+
4+
def fn(input: typing.Optional[str], headers: typing.Optional[typing.Dict[str, str]]) -> typing.Optional[str]:
5+
"""echo the input"""
6+
if headers is not None:
7+
return json.dumps(headers)
8+
else:
9+
return "{}"

test/fns/show-headers/requirements.txt

Whitespace-only changes.

test/fns/sieve-of-eratosthenes/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module.exports = (req, res) => {
1212
}
1313
}
1414

15-
response = ("Found " + primes.length + " primes under " + max);
15+
let response = ("Found " + primes.length + " primes under " + max);
1616

1717
console.log(response);
1818

test/test_all.py

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import unittest
44

5+
import json
56
import os
67
import os.path as path
78
import signal
@@ -48,7 +49,7 @@ def setUpModule() -> None:
4849
uname = os.uname()
4950
if uname.machine == "x86_64":
5051
arch = "amd64"
51-
elif uname.machine == "arm64":
52+
elif uname.machine == "arm64" or uname.machine == "aarch64":
5253
arch = "arm64"
5354
else:
5455
raise Exception(f"Unsupported architecture: {uname.machine}")
@@ -534,6 +535,136 @@ def test_invoke_grpc(self) -> None:
534535
self.assertIsNotNone(response)
535536
self.assertEqual(response.response, payload)
536537

538+
class TestShowHeadersJS(TinyFaaSTest):
539+
fn = ""
540+
541+
@classmethod
542+
def setUpClass(cls) -> None:
543+
super(TestShowHeadersJS, cls).setUpClass()
544+
cls.fn = startFunction(path.join(fn_path, "show-headers-js"), "headersjs", "nodejs", 1)
545+
546+
def setUp(self) -> None:
547+
super(TestShowHeadersJS, self).setUp()
548+
self.fn = TestShowHeadersJS.fn
549+
550+
def test_invoke_http(self) -> None:
551+
"""invoke a function"""
552+
553+
# make a request to the function with a custom headers
554+
req = urllib.request.Request(
555+
f"http://{self.host}:{self.http_port}/{self.fn}",
556+
headers={"lab": "scalable_software_systems_group"},
557+
)
558+
559+
res = urllib.request.urlopen(req, timeout=10)
560+
561+
# check the response
562+
self.assertEqual(res.status, 200)
563+
response_body = res.read().decode("utf-8")
564+
response_json = json.loads(response_body)
565+
self.assertIn("lab", response_json)
566+
self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header
567+
self.assertIn("user-agent", response_json)
568+
self.assertEqual(response_json["user-agent"], "Python-urllib/3.11") # python client
569+
570+
return
571+
572+
# def test_invoke_coap(self) -> None: # CoAP does not support headers
573+
574+
575+
def test_invoke_grpc(self) -> None:
576+
"""invoke a function"""
577+
try:
578+
import grpc
579+
except ImportError:
580+
self.skipTest(
581+
"grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt"
582+
)
583+
584+
import tinyfaas_pb2
585+
import tinyfaas_pb2_grpc
586+
587+
# make a request to the function with a payload
588+
payload = ""
589+
metadata = (("lab", "scalable_software_systems_group"),)
590+
591+
with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel:
592+
stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel)
593+
response = stub.Request(
594+
tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata
595+
)
596+
597+
response_json = json.loads(response.response)
598+
self.assertIn("lab", response_json)
599+
self.assertEqual(response_json["lab"], "scalable_software_systems_group") # custom header
600+
self.assertIn("user-agent", response_json)
601+
self.assertIn("grpc-python/1.64.1", response_json["user-agent"]) # client header
602+
603+
class TestShowHeaders(TinyFaaSTest): # Note: In Python, the http.server module (and many other HTTP libraries) automatically capitalizes the first character of each word in the header keys.
604+
fn = ""
605+
606+
@classmethod
607+
def setUpClass(cls) -> None:
608+
super(TestShowHeaders, cls).setUpClass()
609+
cls.fn = startFunction(path.join(fn_path, "show-headers"), "headers", "python3", 1)
610+
611+
def setUp(self) -> None:
612+
super(TestShowHeaders, self).setUp()
613+
self.fn = TestShowHeaders.fn
614+
615+
def test_invoke_http(self) -> None:
616+
"""invoke a function"""
617+
618+
# make a request to the function with a custom headers
619+
req = urllib.request.Request(
620+
f"http://{self.host}:{self.http_port}/{self.fn}",
621+
headers={"Lab": "scalable_software_systems_group"},
622+
)
623+
624+
res = urllib.request.urlopen(req, timeout=10)
625+
626+
# check the response
627+
self.assertEqual(res.status, 200)
628+
response_body = res.read().decode("utf-8")
629+
response_json = json.loads(response_body)
630+
self.assertIn("Lab", response_json)
631+
self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header
632+
self.assertIn("User-Agent", response_json)
633+
self.assertIn("Python-urllib", response_json["User-Agent"]) # python client
634+
635+
return
636+
637+
# def test_invoke_coap(self) -> None: # CoAP does not support headers, instead you have
638+
639+
def test_invoke_grpc(self) -> None:
640+
"""invoke a function"""
641+
try:
642+
import grpc
643+
except ImportError:
644+
self.skipTest(
645+
"grpc is not installed -- if you want to run gRPC tests, install the dependencies in requirements.txt"
646+
)
647+
648+
import tinyfaas_pb2
649+
import tinyfaas_pb2_grpc
650+
651+
# make a request to the function with a payload
652+
payload = ""
653+
metadata = (("lab", "scalable_software_systems_group"),)
654+
655+
with grpc.insecure_channel(f"{self.host}:{self.grpc_port}") as channel:
656+
stub = tinyfaas_pb2_grpc.TinyFaaSStub(channel)
657+
response = stub.Request(
658+
tinyfaas_pb2.Data(functionIdentifier=self.fn, data=payload), metadata=metadata
659+
)
660+
661+
response_json = json.loads(response.response)
662+
663+
self.assertIn("Lab", response_json)
664+
self.assertEqual(response_json["Lab"], "scalable_software_systems_group") # custom header
665+
self.assertIn("User-Agent", response_json)
666+
self.assertIn("grpc-python", response_json["User-Agent"]) # client header
667+
537668

538669
if __name__ == "__main__":
539670
# check that make is installed

0 commit comments

Comments
 (0)