-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweb-hawk.go
289 lines (260 loc) · 8.41 KB
/
web-hawk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/dghubble/go-twitter/twitter"
"github.com/dghubble/oauth1"
"github.com/googollee/go-socket.io"
r "gopkg.in/dancannon/gorethink.v2"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
type Status struct {
Timestamp string `json:"Timestamp"`
Services []ServiceStats `json:"Services"`
}
type ServiceStats struct {
Name string `json:"Name"`
Alive bool `json:"Alive"`
URL string `json:"URL"`
Msec float64 `json:"Msec"`
}
type AllNews struct {
News []News `json:"News"`
}
type News struct {
Timestamp string `json:"Timestamp"`
Content string `json:"Content"`
}
var cors *string
var urlCleaner []string
func main() {
portPtr := addConf("PORT", "8080", "Port to host location service on.")
urlsPtr := addConf("URLS", "http://localhost:7070/up, http://www.clianz.com/", "Comma seperated URLs list to monitor")
corsPtr := addConf("CORS", "", "CORS URL to configure.")
dbAddressPtr := addConf("DB_ADDDRESS", "localhost:28015", "Address of RethinkDB instance")
dbNamePtr := addConf("DB_NAME", "hawk", "Name of RethinkDB database")
dbUsernamePtr := addConf("DB_USERNAME", "web-hawk", "Username of RethinkDB user")
dbPasswordPtr := addConf("DB_PASSWORD", "hawkpassw0rd", "Password of RethinkDB user")
pollTimePtr := addConf("POLL_TIME", "300", "Time (in seconds) between service status polls. '0' will disable server from polling.")
urlCleanerPtr := addConf("URL_CLEANERS", "http://, https://, www.", "Part of URL to strip for converting to friendly name.")
twitterPtr := addConf("TWITTER", "", "Comma separated list of Twitter params (consumerKey,consumerSecret,accessToken,accessSecret,username)")
flag.Parse()
cors = corsPtr
log.Printf("Setting CORS: %v", *cors)
urlCleaner = strings.Split(strings.Replace(*urlCleanerPtr, " ", "", -1), ",")
twitterConf := strings.Split(*twitterPtr, ",")
var twitterClient *twitter.Client
dbSession, err := r.Connect(r.ConnectOpts{
Address: *dbAddressPtr,
Database: *dbNamePtr,
Username: *dbUsernamePtr,
Password: *dbPasswordPtr,
})
if err != nil {
log.Panic("Error connecting to DB.", err)
}
log.Printf("Monitoring URLs: %v", *urlsPtr)
urls := strings.Split(strings.Replace(*urlsPtr, " ", "", -1), ",")
timeout := time.Duration(1 * time.Second)
client := http.Client{
Timeout: timeout,
}
// Twitter client
if len(twitterConf) > 0 {
config := oauth1.NewConfig(twitterConf[0], twitterConf[1])
token := oauth1.NewToken(twitterConf[2], twitterConf[3])
httpClient := config.Client(oauth1.NoContext, token)
twitterClient = twitter.NewClient(httpClient)
}
// Socker server
socketServer, err := NewSocketServer(nil)
if err != nil {
log.Fatal(err)
}
socketServer.On("connection", func(so socketio.Socket) {
// log.Println("on connection")
so.Join("updatesChannel")
// so.On("disconnection", func() {
// log.Println("on disconnect")
// })
})
socketServer.On("error", func(so socketio.Socket, err error) {
log.Println("error:", err)
})
http.Handle("/socket.io/", socketServer)
// Subscibe to DB and broadcast changes
broadcastDbChanges(socketServer, dbSession)
// Poll server to monitor periodically
pollTime, err := strconv.ParseInt(*pollTimePtr, 10, 64)
if err != nil || (pollTime > 0 && pollTime < 5) {
panic("Service poll time invalid. Must be 5 seconds or more in interger value.")
}
if pollTime != 0 {
monitorServiceStatus(dbSession, client, urls, pollTime)
}
// Web handler
http.HandleFunc("/up", func(w http.ResponseWriter, r *http.Request) {
addCors(w)
statusResp := fetchServerStatusFromDb(dbSession)
enc := json.NewEncoder(w)
enc.Encode(statusResp)
})
http.HandleFunc("/history", func(w http.ResponseWriter, r *http.Request) {
addCors(w)
statusResp := fetchServerStatusHistoryFromDb(dbSession, pollTime)
enc := json.NewEncoder(w)
enc.Encode(statusResp)
})
http.HandleFunc("/news", func(w http.ResponseWriter, r *http.Request) {
addCors(w)
if twitterClient != nil {
params := &twitter.UserTimelineParams{ScreenName: twitterConf[4]}
tweets, _, err := twitterClient.Timelines.UserTimeline(params)
if err != nil {
log.Fatal(err)
}
allNews := AllNews{}
for _, tweet := range tweets {
log.Printf("TWEET ==> %v: %v", tweet.CreatedAt, tweet.Text)
allNews.News = append(allNews.News, News{Timestamp: tweet.CreatedAt, Content: tweet.Text})
}
enc := json.NewEncoder(w)
enc.Encode(allNews)
} else {
fmt.Fprintf(w, "No News")
}
})
err = http.ListenAndServe(":"+*portPtr, nil)
if err != nil {
log.Fatalf("Error: %s", err.Error())
}
log.Printf("Server running on port %v", *portPtr)
}
// Server container for socker server
type Server struct {
socketio.Server
}
// NewSocketServer to add CORS, see: https://github.com/googollee/go-socket.io/issues/122
func NewSocketServer(transportNames []string) (*Server, error) {
ret, err := socketio.NewServer(transportNames)
if err != nil {
return nil, err
}
return &Server{*ret}, nil
}
func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
addCors(w)
s.Server.ServeHTTP(w, r)
}
func broadcastDbChanges(server *Server, dbSession *r.Session) {
// Listen to DB for changes
res, err := r.DB("test").Table("hawk").Changes().Run(dbSession)
if err != nil {
log.Fatalf("Error listening to DB changes: %s", err.Error())
}
go func(res *r.Cursor) {
var value map[string]map[string]interface{}
for res.Next(&value) {
newStatus := value["new_val"]
// log.Printf("DB CHANGE: %v", newStatus)
if newStatus != nil {
statusByte, _ := json.Marshal(newStatus)
statusStr := string(statusByte)
// log.Printf("Broadcasting: %v", statusStr)
broadcastStatus(server, statusStr)
}
}
}(res)
}
func monitorServiceStatus(session *r.Session, client http.Client, urls []string, pollTime int64) {
pushServerStatusToDb(session, fetchServerStatus(client, urls))
ticker := time.NewTicker(time.Duration(pollTime) * time.Second)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
pushServerStatusToDb(session, fetchServerStatus(client, urls))
}
}()
}
func broadcastStatus(server *Server, statusResp string) {
server.BroadcastTo("updatesChannel", "updateEvent", statusResp)
}
func pushServerStatusToDb(session *r.Session, status Status) {
// log.Printf("Inserting: %v", status)
err := r.DB("test").Table("hawk").Insert(status).Exec(session)
if err != nil {
log.Printf("Error writing to DB: %v", err)
}
}
func fetchServerStatusFromDb(session *r.Session) Status {
var response Status
resp, err := r.DB("test").Table("hawk").OrderBy(r.Desc("Timestamp")).Limit(1).Run(session)
if err != nil {
log.Printf("Error reading DB: %v", err)
}
resp.One(&response)
return response
}
func fetchServerStatusHistoryFromDb(session *r.Session, pollTime int64) []Status {
// limit := (24 * 60 * 60) / pollTime // One day worth of polls
limit := 48 // Match this with webpage for best perf
var response []Status
resp, err := r.DB("test").Table("hawk").OrderBy(r.Desc("Timestamp")).Limit(limit).Run(session)
if err != nil {
log.Printf("Error reading DB: %v", err)
}
resp.All(&response)
return response
}
func fetchServerStatus(client http.Client, urls []string) Status {
queue := make(chan ServiceStats, len(urls))
for _, eachURL := range urls {
go func(v string) {
startTime := time.Now()
resp, err := client.Head(v)
if err != nil || resp.StatusCode != 200 {
log.Printf("Error: %v", v)
queue <- ServiceStats{Name: getNameFromURL(v), Alive: false, URL: v, Msec: 0}
} else {
endTime := time.Since(startTime).Seconds() * 1000
log.Printf("Success (%.2f ms): %v", endTime, v)
queue <- ServiceStats{Name: getNameFromURL(v), Alive: true, URL: v, Msec: endTime}
}
}(eachURL)
}
statusResp := Status{Timestamp: time.Now().Format(time.RFC3339)}
for i := 0; i < len(urls); i++ {
select {
case elem := <-queue:
statusResp.Services = append(statusResp.Services,
ServiceStats{Name: elem.Name, Alive: elem.Alive, Msec: elem.Msec, URL: elem.URL})
}
}
close(queue)
return statusResp
}
func getNameFromURL(url string) string {
for _, v := range urlCleaner {
url = strings.Replace(url, v, "", -1)
}
return url
}
func addConf(name, defaultVal, desc string) *string {
optPtr := flag.String(name, defaultVal, desc)
if os.Getenv(name) != "" {
*optPtr = os.Getenv(name)
}
return optPtr
}
func addCors(w http.ResponseWriter) {
if len(*cors) > 0 {
w.Header().Set("Access-Control-Allow-Origin", *cors)
w.Header().Add("Access-Control-Allow-Credentials", "true")
}
}