Skip to content

Commit

Permalink
wip proxy - ok for http
Browse files Browse the repository at this point in the history
  • Loading branch information
ltearno committed Jul 21, 2020
1 parent 1c6c7c4 commit ded4a4b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 35 deletions.
66 changes: 58 additions & 8 deletions src/my-own-cluster/apicore/core-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"my-own-cluster/enginejs"
"my-own-cluster/enginewasm"
"net/http"
"sync"
"time"
"unsafe"
)
Expand Down Expand Up @@ -283,18 +284,67 @@ func BetaWebProxy(ctx *common.FunctionExecutionContext, proxySpecJSON string) (i
return -1, err
}

res, err := GetUrl(ctx, spec.Url)
if err != nil {
return -2, err
}
/*
res, err := GetUrl(ctx, spec.Url)
if err != nil {
return -2, err
}
buffer := ctx.Orchestrator.GetExchangeBuffer(spec.OutputExchangeBufferID)
buffer.Write(res)
buffer := ctx.Orchestrator.GetExchangeBuffer(spec.OutputExchangeBufferID)
return 0, nil
*/

buffer.Write(res)
fmt.Printf("BETA PROXY\n")

fmt.Printf("DLKJDHLKJDHLKDJHLKDJHLDKJ HDLKJHDLKJ HDLKJH BETA PROXY\n")
req, resp, err := ctx.Orchestrator.CreateExchangeBuffersFromHttpClientRequest(spec.Method, spec.Url, spec.Headers)
if err != nil {
return -1, err
}

ctx.Orchestrator.CreateExchangeBuffersFromHttpClientRequest(spec.Method, spec.Url, spec.Headers)
input := ctx.Orchestrator.GetExchangeBuffer(spec.InputExchangeBufferID)
output := ctx.Orchestrator.GetExchangeBuffer(spec.OutputExchangeBufferID)

var wg sync.WaitGroup
wg.Add(2)

fmt.Printf("LAUNCH LOOPS\n")

go func() {
defer wg.Done()
for {
i := input.GetBuffer()
if i == nil || len(i) == 0 {
fmt.Printf("INPUT FINISHED\n")
req.Close()
return
}

fmt.Printf("READ %d FROM INPUT\n", len(i))
req.Write(i)
}
}()

go func() {
defer wg.Done()
for {
o := resp.GetBuffer()
if o == nil {
fmt.Printf("RESPONSE FINISHED\n")
output.Close()
return
}

fmt.Printf("READ %d FROM RESPONSE\n", len(o))
output.Write(o)
}
}()

wg.Wait()

fmt.Printf("LOOPS FINISHED\n")

return 0, nil
}
155 changes: 128 additions & 27 deletions src/my-own-cluster/common/exchangebuffer-http-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"crypto/tls"
"fmt"
"io"
"my-own-cluster/tools"
"net/http"

"github.com/golang-collections/go-datastructures/queue"
)

/*
Expand All @@ -19,35 +22,62 @@ req, err := NewRequest("POST", url, body)
to send data to the remote server
*/
type requestWrapper struct {
request *http.Request
headers map[string]string
headersSet bool
request *http.Request
headers map[string]string

queue *queue.Queue
availableBytes []byte
}

func newRequestWrapper(method string, url string, headers map[string]string) *requestWrapper {
func newRequestWrapper(method string, url string, headers map[string]string) (*requestWrapper, error) {
w := &requestWrapper{
headers: make(map[string]string),
headersSet: false,
headers: make(map[string]string),
queue: queue.New(1),
}

req, err := http.NewRequest(method, url, w)
if err != nil {
return nil
return nil, err
}

w.request = req

for k, v := range headers {
w.headers[k] = v
req.Header.Set(k, v)
}

return w
return w, nil
}

// called by http client when sending the request body
func (w *requestWrapper) Read(buffer []byte) (int, error) {
fmt.Printf("ADHGKADJHAGDKJHADGKJHADG THE THING CALLED OUR READ WRAPPER")
return 0, io.EOF
fmt.Printf("rw read\n")
for {
if w.availableBytes != nil {
// first purge available bytes
toGive := tools.Min(len(buffer), len(w.availableBytes))
copy(buffer[0:toGive], w.availableBytes[0:toGive])
w.availableBytes = w.availableBytes[toGive:]
if len(w.availableBytes) == 0 {
w.availableBytes = nil
}
fmt.Printf("rw give %d bytes\n", toGive)
return toGive, nil
} else if w.queue.Disposed() {
// if finished, say it
fmt.Printf("rw says EOF\n")
return 0, io.EOF
} else {
// or else wait for next available bytes
fmt.Printf("rw waits on queue\n")
b, _ := w.queue.Get(1)
fmt.Printf("rw wait is finished with %v\n", b)
if b != nil && len(b) > 0 && len(b[0].([]byte)) > 0 {
w.availableBytes = b[0].([]byte)
}
}
}
}

func (w *requestWrapper) GetHeader(name string) (string, bool) {
Expand Down Expand Up @@ -84,35 +114,96 @@ func (w *requestWrapper) WriteStatusCode(statusCode int) {
}

func (w *requestWrapper) Write(buffer []byte) (int, error) {
w.ensureHeadersSet()
o := make([]byte, len(buffer))
copy(o, buffer)
w.queue.Put(o)
return len(buffer), nil
}

func (w *requestWrapper) Close() int {
return -1
w.queue.Dispose()
return 0
}

func (w *requestWrapper) ensureHeadersSet() {
if !w.headersSet {
w.headersSet = true
/*
to receive data from the remote server
*/
type responseWrapper struct {
response *http.Response
headers map[string]string
}

func newResponseWrapper(r *http.Response) (*responseWrapper, error) {
w := &responseWrapper{
headers: make(map[string]string),
response: r,
}

for k, v := range w.headers {
w.request.Header.Set(k, v)
for k, v := range r.Header {
if len(v) != 1 {
fmt.Printf("WARNING : header '%s' has %d values, skipping all but the first...\n", k, len(v))
}

if len(v) > 0 {
w.headers[k] = v[0]
}
}

return w, nil
}

/*
to receive data from the remote server
*/
type responseWrapper struct {
headers map[string]string
func (w *responseWrapper) GetHeader(name string) (string, bool) {
h, ok := w.headers[name]
return h, ok
}

func (w *responseWrapper) GetHeadersCount() int {
return len(w.headers)
}

func (w *responseWrapper) GetHeaders(cb func(name string, value string)) {
for k, v := range w.headers {
cb(k, v)
}
}

func (o *Orchestrator) CreateExchangeBuffersFromHttpClientRequest(method string, url string, headers map[string]string) {
rw := newRequestWrapper(method, url, headers)
if rw == nil {
return
func (w *responseWrapper) GetStatusCode() int {
return w.response.StatusCode
}

func (w *responseWrapper) GetBuffer() []byte {
buffer := make([]byte, 1024*1024)
n, err := w.response.Body.Read(buffer)
if n > 0 {
return buffer[0:n]
}
if err != nil {
fmt.Printf("Error while reading http response '%v'\n", err)
}
return nil
}

func (w *responseWrapper) SetHeader(name string, value string) {
w.headers[name] = value
}

func (w *responseWrapper) WriteStatusCode(statusCode int) {
fmt.Printf("ERROR cannot call WriteStatusCode on client response wrapper\n")
}

func (w *responseWrapper) Write(buffer []byte) (int, error) {
return -1, fmt.Errorf("ERROR cannot call WriteStatusCode on client response wrapper")
}

func (w *responseWrapper) Close() int {
w.response.Body.Close()
return 0
}

func (o *Orchestrator) CreateExchangeBuffersFromHttpClientRequest(method string, url string, headers map[string]string) (ExchangeBuffer, ExchangeBuffer, error) {
requestWrapper, err := newRequestWrapper(method, url, headers)
if err != nil {
return nil, nil, err
}

client := &http.Client{Transport: &http.Transport{
Expand All @@ -121,5 +212,15 @@ func (o *Orchestrator) CreateExchangeBuffersFromHttpClientRequest(method string,
},
}}

client.Do(rw.request)
response, err := client.Do(requestWrapper.request)
if err != nil {
return nil, nil, err
}

responseWrapper, err := newResponseWrapper(response)
if err != nil {
return nil, nil, err
}

return requestWrapper, responseWrapper, nil
}
4 changes: 4 additions & 0 deletions src/my-own-cluster/common/exchangebuffer-http-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (b *HttpReaderExchangeBuffer) GetStatusCode() int {
func (b *HttpReaderExchangeBuffer) GetBuffer() []byte {
body, err := ioutil.ReadAll(b.r.Body)
if err == nil {
if body != nil && len(body) == 0 {
body = nil
}

return body
} else {
fmt.Printf("ERROR http wrapped request CANNOT READ BODY, maybe tried to read it twice ? (%v)\n", err)
Expand Down
1 change: 1 addition & 0 deletions update-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ go get github.com/ltearno/go-wasm3
go get gopkg.in/ltearno/go-duktape.v3
go get golang.org/x/sys/unix
go get github.com/gorilla/websocket
go get github.com/golang-collections/go-datastructures/queue

exit 0

0 comments on commit ded4a4b

Please sign in to comment.