-
Notifications
You must be signed in to change notification settings - Fork 680
/
types.go
164 lines (142 loc) · 4.11 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//go:generate go-extpoints . AdapterFactory HttpHandler AdapterTransport LogRouter Job
package router
import (
"net"
"net/http"
"path"
"strings"
"time"
docker "github.com/fsouza/go-dockerclient"
)
// HTTPHandler is an extension type for adding HTTP endpoints
type HTTPHandler func() http.Handler
// AdapterFactory is an extension type for adding new log adapters
type AdapterFactory func(route *Route) (LogAdapter, error)
// AdapterTransport is an extension type for connection transports used by adapters
type AdapterTransport interface {
Dial(addr string, options map[string]string) (net.Conn, error)
}
// LogAdapter is a streamed log
type LogAdapter interface {
Stream(logstream chan *Message)
}
// Job is a thing to be done
type Job interface {
Run() error
Setup() error
Name() string
}
// LogRouter sends logs to LogAdapters via Routes
type LogRouter interface {
RoutingFrom(containerID string) bool
Route(route *Route, logstream chan *Message)
}
// RouteStore is a collections of Routes
type RouteStore interface {
Get(id string) (*Route, error)
GetAll() ([]*Route, error)
Add(route *Route) error
Remove(id string) bool
}
// Message is a log messages
type Message struct {
Container *docker.Container
Source string
Data string
Time time.Time
}
// Route represents what subset of logs should go where
type Route struct {
ID string `json:"id"`
FilterID string `json:"filter_id,omitempty"`
FilterName string `json:"filter_name,omitempty"`
FilterSources []string `json:"filter_sources,omitempty"`
FilterLabels []string `json:"filter_labels,omitempty"`
Adapter string `json:"adapter"`
Address string `json:"address"`
Options map[string]string `json:"options,omitempty"`
adapter LogAdapter
closed bool
closer chan struct{}
closerRcv <-chan struct{} // used instead of closer when set
}
// AdapterType returns a route's adapter type string
func (r *Route) AdapterType() string {
return strings.Split(r.Adapter, "+")[0]
}
// AdapterTransport returns a route's adapter transport string
func (r *Route) AdapterTransport(dfault string) string {
parts := strings.Split(r.Adapter, "+")
if len(parts) > 1 {
return parts[1]
}
return dfault
}
// Closer returns a route's closerRcv
func (r *Route) Closer() <-chan struct{} {
if r.closerRcv != nil {
return r.closerRcv
}
return r.closer
}
// OverrideCloser sets a Route.closer to closer
func (r *Route) OverrideCloser(closer <-chan struct{}) {
r.closerRcv = closer
}
// Close sends true to a Route.closer
func (r *Route) Close() {
r.closer <- struct{}{}
}
func (r *Route) matchAll() bool {
if r.FilterID == "" && r.FilterName == "" && len(r.FilterSources) == 0 && len(r.FilterLabels) == 0 {
return true
}
return false
}
// MultiContainer returns whether the Route is matching multiple containers or not
func (r *Route) MultiContainer() bool {
return r.matchAll() || strings.Contains(r.FilterName, "*")
}
// MatchContainer returns whether the Route is responsible for a given container
func (r *Route) MatchContainer(id, name string, labels map[string]string) bool {
if r.matchAll() {
return true
}
if r.FilterID != "" && !strings.HasPrefix(id, r.FilterID) {
return false
}
match, err := path.Match(r.FilterName, name)
if err != nil || (r.FilterName != "" && !match) {
return false
}
for _, label := range r.FilterLabels {
labelParts := strings.SplitN(label, ":", 2)
if len(labelParts) > 1 {
labelKey := labelParts[0]
labelValue := labelParts[1]
labelMatch, labelErr := path.Match(labelValue, labels[labelKey])
if labelErr != nil || (labelValue != "" && !labelMatch) {
return false
}
}
}
return true
}
// MatchMessage returns whether the Route is responsible for a given Message
func (r *Route) MatchMessage(message *Message) bool {
if r.matchAll() {
return true
}
if len(r.FilterSources) > 0 && !contains(r.FilterSources, message.Source) {
return false
}
return true
}
func contains(strs []string, str string) bool {
for _, s := range strs {
if s == str {
return true
}
}
return false
}