-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
128 lines (104 loc) · 2.54 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"container/heap"
"fmt"
// "encoding/json"
"log"
"net"
"time"
"github.com/gorilla/websocket"
)
type StockData struct {
StockName string
Data string
Timestamp time.Time
}
type PriorityQueue []*StockData
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Timestamp.After(pq[j].Timestamp)
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*StockData)
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
type WebSocketConnection struct {
conn *websocket.Conn
subscribedStocks map[string]bool
}
var clients map[*WebSocketConnection]bool
func handleConnection(conn net.PacketConn, c chan<- *StockData) {
buffer := make([]byte, 1024)
n, _, err := conn.ReadFrom(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return // If there's a timeout, just return and end this goroutine
} else {
log.Fatal(err) // If it's another error, log it and exit
}
}
var stockData StockData
fmt.Printf("reading stock data : %d\n", buffer[:n])
// err = json.Unmarshal(buffer[:n], &stockData)
// if err != nil {
// log.Fatal(err)
// }
c <- &stockData
}
func startServer(c chan<- *StockData, maxWorkers int) {
ln, err := net.ListenPacket("udp", "192.168.1.66:7701")
if err != nil {
log.Fatal(err)
}
defer ln.Close()
// Create a worker pool
workers := make(chan struct{}, maxWorkers)
for {
workers <- struct{}{}
go func() {
handleConnection(ln, c)
<-workers
}()
time.Sleep(1 * time.Millisecond) // Add a small delay to prevent creating too many goroutines
}
}
func main() {
// Define a buffered channel to communicate between goroutines
dataChannel := make(chan *StockData, 1000)
// Define your ports
// ports := []string{"7001"}
maxWorkers := 100
// for _, port := range ports {
// go startServer(port, dataChannel, maxWorkers)
// }
go startServer(dataChannel, maxWorkers)
// Start a goroutine to handle data and send it to frontend
go func() {
pq := &PriorityQueue{}
heap.Init(pq)
for data := range dataChannel {
heap.Push(pq, data)
// Handle data and send it to frontend
for client := range clients {
if client.subscribedStocks[data.StockName] {
err := client.conn.WriteJSON(data)
if err != nil {
log.Fatal(err)
}
}
}
}
}()
// Prevent main from exiting
select {}
}