File tree Expand file tree Collapse file tree 3 files changed +87
-8
lines changed Expand file tree Collapse file tree 3 files changed +87
-8
lines changed Original file line number Diff line number Diff line change
1
+ package event
2
+
3
+ import (
4
+ "time"
5
+ )
6
+
7
+ type Event struct {
8
+ Ts time.Time `json:"ts"`
9
+ Record map [string ]interface {} `json:"record"`
10
+ }
11
+
12
+ func New (ts time.Time , record map [string ]interface {}) Event {
13
+ return Event {
14
+ Ts : ts ,
15
+ Record : record ,
16
+ }
17
+ }
18
+
19
+ type Events []Event
20
+
21
+ func (e Events ) Len () int {
22
+ return len (e )
23
+ }
24
+
25
+ func (e Events ) Less (i , j int ) bool {
26
+ return e [i ].Ts .After (e [j ].Ts )
27
+ }
28
+
29
+ func (e Events ) Swap (i , j int ) {
30
+ e [i ], e [j ] = e [j ], e [i ]
31
+ }
Original file line number Diff line number Diff line change @@ -2,18 +2,15 @@ package main
2
2
3
3
import (
4
4
"fmt"
5
+ "net/http"
5
6
"os"
6
- "time"
7
7
8
+ "github.com/factorysh/fluent-server/mirror"
8
9
"github.com/factorysh/fluent-server/server"
9
10
)
10
11
11
- func handler (tag string , ts * time.Time , record map [string ]interface {}) error {
12
- fmt .Println (tag , ts , record )
13
- return nil
14
- }
15
-
16
12
func main () {
13
+ m := mirror .New ()
17
14
var s * server.Server
18
15
caCrt := os .Getenv ("CA_CRT" )
19
16
if caCrt != "" {
@@ -26,9 +23,14 @@ ca.crt: %s
26
23
server.crt: %s
27
24
server.key: %s
28
25
` , caCrt , os .Getenv ("SRV_CRT" ), os .Getenv ("SRV_KEY" ))
29
- s = server .NewTLS (handler , cfg )
26
+ s = server .NewTLS (m . Handler , cfg )
30
27
} else {
31
- s = server .New (handler )
28
+ s = server .New (m .Handler )
29
+ }
30
+ ll := os .Getenv ("MIRROR_LISTEN" )
31
+ if ll != "" {
32
+ go http .ListenAndServe (ll , m )
33
+ fmt .Println ("mirror listen " , ll )
32
34
}
33
35
34
36
l := os .Getenv ("LISTEN" )
Original file line number Diff line number Diff line change
1
+ package mirror
2
+
3
+ import (
4
+ "encoding/json"
5
+ "fmt"
6
+ "net/http"
7
+ "sync"
8
+ "time"
9
+
10
+ "github.com/factorysh/fluent-server/event"
11
+ )
12
+
13
+ type Mirror struct {
14
+ lock * sync.Mutex
15
+ events map [string ]event.Events
16
+ }
17
+
18
+ func New () * Mirror {
19
+ return & Mirror {
20
+ lock : & sync.Mutex {},
21
+ events : make (map [string ]event.Events ),
22
+ }
23
+ }
24
+
25
+ func (t * Mirror ) Handler (tag string , ts * time.Time , record map [string ]interface {}) error {
26
+ fmt .Println (tag , ts , record )
27
+ t .lock .Lock ()
28
+ defer t .lock .Unlock ()
29
+ evts , ok := t .events [tag ]
30
+ if ! ok {
31
+ t .events [tag ] = event.Events {
32
+ event .New (* ts , record ),
33
+ }
34
+ } else {
35
+ t .events [tag ] = append (evts , event .New (* ts , record ))
36
+ }
37
+ return nil
38
+ }
39
+
40
+ func (t * Mirror ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
41
+ w .Header ().Set ("Content-Type" , "application/json" )
42
+ err := json .NewEncoder (w ).Encode (t .events )
43
+ if err != nil {
44
+ panic (err )
45
+ }
46
+ }
You can’t perform that action at this time.
0 commit comments