Skip to content
This repository was archived by the owner on Feb 27, 2022. It is now read-only.

Commit 104e94e

Browse files
author
Reto Lehmann
committed
Added multiple clusters, routes, and refactorings.
1 parent a556326 commit 104e94e

File tree

13 files changed

+358
-281
lines changed

13 files changed

+358
-281
lines changed

api/endpoints.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
package api
2-

api/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package api
22

33
import (
4+
"github.com/gin-contrib/cors"
45
"github.com/gin-gonic/gin"
56
"github.com/golang/glog"
6-
"github.com/gin-contrib/cors"
77
)
88

99
func init() {

balancer/cluster.go

Lines changed: 0 additions & 170 deletions
This file was deleted.

balancer/core.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

balancer/core/core.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package core
2+
3+
import "net"
4+
5+
type Context struct {
6+
Hostname string
7+
Conn net.Conn
8+
}
9+
10+
type ReadWriteCount struct {
11+
CountRead uint
12+
CountWrite uint
13+
}
14+
15+
func (rwc ReadWriteCount) IsZero() bool {
16+
return rwc.CountRead == 0 && rwc.CountWrite == 0
17+
}

balancer/core/httpreader.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package core
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"net/http"
7+
)
8+
9+
// HttpHostHeader returns the HTTP Host header from br without
10+
// consuming any of its bytes. It returns "" if it can't find one.
11+
func HttpHostHeader(br *bufio.Reader) string {
12+
const maxPeek = 4 << 10
13+
peekSize := 0
14+
for {
15+
peekSize++
16+
if peekSize > maxPeek {
17+
b, _ := br.Peek(br.Buffered())
18+
return httpHostHeaderFromBytes(b)
19+
}
20+
b, err := br.Peek(peekSize)
21+
if n := br.Buffered(); n > peekSize {
22+
b, _ = br.Peek(n)
23+
peekSize = n
24+
}
25+
if len(b) > 0 {
26+
if b[0] < 'A' || b[0] > 'Z' {
27+
// Doesn't look like an HTTP verb
28+
// (GET, POST, etc).
29+
return ""
30+
}
31+
if bytes.Index(b, crlfcrlf) != -1 || bytes.Index(b, lflf) != -1 {
32+
req, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(b)))
33+
if err != nil {
34+
return ""
35+
}
36+
if len(req.Header["Host"]) > 1 {
37+
return ""
38+
}
39+
return req.Host
40+
}
41+
}
42+
if err != nil {
43+
return httpHostHeaderFromBytes(b)
44+
}
45+
}
46+
}
47+
48+
var (
49+
lfHostColon = []byte("\nHost:")
50+
lfhostColon = []byte("\nhost:")
51+
crlf = []byte("\r\n")
52+
lf = []byte("\n")
53+
crlfcrlf = []byte("\r\n\r\n")
54+
lflf = []byte("\n\n")
55+
)
56+
57+
func httpHostHeaderFromBytes(b []byte) string {
58+
if i := bytes.Index(b, lfHostColon); i != -1 {
59+
return string(bytes.TrimSpace(untilEOL(b[i+len(lfHostColon):])))
60+
}
61+
if i := bytes.Index(b, lfhostColon); i != -1 {
62+
return string(bytes.TrimSpace(untilEOL(b[i+len(lfhostColon):])))
63+
}
64+
return ""
65+
}
66+
67+
// untilEOL returns v, truncated before the first '\n' byte, if any.
68+
// The returned slice may include a '\r' at the end.
69+
func untilEOL(v []byte) []byte {
70+
if i := bytes.IndexByte(v, '\n'); i != -1 {
71+
return v[:i]
72+
}
73+
return v
74+
}

balancer/proxy.go renamed to balancer/core/proxy.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,23 @@
1-
package balancer
1+
package core
22

33
import (
44
"io"
55
"net"
6+
67
"time"
78

89
log "github.com/sirupsen/logrus"
910
)
1011

1112
const (
12-
1313
/* Buffer size to handle data from socket */
1414
BUFFER_SIZE = 16 * 1024
1515

16-
/* Interval of pushing aggregated read/write stats */
16+
/* Interval of pushing r/w stats */
1717
PROXY_STATS_PUSH_INTERVAL = 1 * time.Second
1818
)
1919

20-
/**
21-
* Perform copy/proxy data from 'from' to 'to' socket, counting r/w stats and
22-
* dropping connection if timeout exceeded
23-
*/
24-
func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan ReadWriteCount {
25-
20+
func Proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan ReadWriteCount {
2621
stats := make(chan ReadWriteCount)
2722
outStats := make(chan ReadWriteCount)
2823

@@ -74,8 +69,7 @@ func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan ReadWriteCo
7469

7570
// Run proxy copier
7671
go func() {
77-
err := Copy(to, from, stats)
78-
// hack to determine normal close. TODO: fix when it will be exposed in golang
72+
err := copy(to, from, stats)
7973
e, ok := err.(*net.OpError)
8074
if err != nil && (!ok || e.Err.Error() != "use of closed network connection") {
8175
log.Warn(err)
@@ -84,18 +78,13 @@ func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan ReadWriteCo
8478
to.Close()
8579
from.Close()
8680

87-
// Stop stats collecting goroutine
8881
close(stats)
8982
}()
9083

9184
return outStats
9285
}
9386

94-
/**
95-
* It's build by analogy of io.Copy
96-
*/
97-
func Copy(to io.Writer, from io.Reader, ch chan<- ReadWriteCount) error {
98-
87+
func copy(to io.Writer, from io.Reader, ch chan<- ReadWriteCount) error {
9988
buf := make([]byte, BUFFER_SIZE)
10089
var err error = nil
10190

balancer/healthcheck.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func NewHealthCheck(ip string, status chan HealthCheckResult, checkInterval time
3030
}
3131

3232
func (hc *HealthCheck) Start() {
33-
log.Infof("Starting health checks for %v", hc.routerHostIP)
33+
log.Infof("Starting health checks for router host %v", hc.routerHostIP)
3434

3535
hc.ticker = *time.NewTicker(hc.interval)
3636

@@ -52,7 +52,7 @@ func (hc *HealthCheck) Start() {
5252
}
5353

5454
func (hc *HealthCheck) Stop() {
55-
log.Infof("Stopping health checks for %v", hc.routerHostIP)
55+
log.Infof("Stopping health checks for router host %v", hc.routerHostIP)
5656
hc.stop <- true
5757
}
5858

@@ -67,7 +67,7 @@ func checkRouterHost(routerHostIp string, result chan<- HealthCheckResult) {
6767
conn.Close()
6868
}
6969

70-
// Tell the cluster about the health result
70+
// Tell the balancer about the health result
7171
result <- HealthCheckResult{
7272
routerHostIP: routerHostIp,
7373
healthy: healthy,

0 commit comments

Comments
 (0)