forked from postmanlabs/observability-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.go
78 lines (68 loc) · 1.92 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package pcap
import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/pkg/errors"
"github.com/akitasoftware/akita-cli/trace"
"github.com/akitasoftware/akita-libs/akinet"
akihttp "github.com/akitasoftware/akita-libs/akinet/http"
akihttp2 "github.com/akitasoftware/akita-libs/akinet/http2"
"github.com/akitasoftware/akita-libs/akinet/tls"
"github.com/akitasoftware/akita-libs/buffer_pool"
. "github.com/akitasoftware/akita-libs/client_telemetry"
)
func Collect(
stop <-chan struct{},
intf string,
bpfFilter string,
bufferShare float32,
parseTCPAndTLS bool,
proc trace.Collector,
packetCount trace.PacketCountConsumer,
pool buffer_pool.BufferPool,
) error {
defer proc.Close()
facts := []akinet.TCPParserFactory{
akihttp.NewHTTPRequestParserFactory(pool),
akihttp.NewHTTPResponseParserFactory(pool),
akihttp2.NewHTTP2PrefaceParserFactory(),
}
if parseTCPAndTLS {
facts = append(facts,
tls.NewTLSClientParserFactory(),
tls.NewTLSServerParserFactory(),
)
}
parser := NewNetworkTrafficParser(bufferShare)
if packetCount != nil {
parser.InstallObserver(CountTcpPackets(intf, packetCount))
}
parsedChan, err := parser.ParseFromInterface(intf, bpfFilter, stop, facts...)
if err != nil {
return errors.Wrap(err, "couldn't start parsing from interface")
}
for t := range parsedChan {
t.Interface = intf
err := proc.Process(t)
t.Content.ReleaseBuffers()
if err != nil {
return err
}
}
return nil
}
// Observe every captured TCP segment here
func CountTcpPackets(ifc string, packetCount trace.PacketCountConsumer) NetworkTrafficObserver {
observer := func(p gopacket.Packet) {
if tcpLayer := p.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp, _ := tcpLayer.(*layers.TCP)
packetCount.Update(PacketCounts{
Interface: ifc,
SrcPort: int(tcp.SrcPort),
DstPort: int(tcp.DstPort),
TCPPackets: 1,
})
}
}
return NetworkTrafficObserver(observer)
}